summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-03-13 15:25:25 +0000
committerTrygve Laugstøl <trygvis@inamo.no>2012-03-13 15:25:25 +0000
commite67fa24d3c21792b3adab3f88e2383219018b696 (patch)
treeef1ab28143ef4f1007ec37926e84ace786ed045e
downloadhttp-mq-e67fa24d3c21792b3adab3f88e2383219018b696.tar.gz
http-mq-e67fa24d3c21792b3adab3f88e2383219018b696.tar.bz2
http-mq-e67fa24d3c21792b3adab3f88e2383219018b696.tar.xz
http-mq-e67fa24d3c21792b3adab3f88e2383219018b696.zip
o Initial import.
-rw-r--r--UrlGenerator.class.php25
-rw-r--r--db.php124
-rw-r--r--endpoint.php36
-rw-r--r--mq.sql24
-rw-r--r--queue.php346
5 files changed, 555 insertions, 0 deletions
diff --git a/UrlGenerator.class.php b/UrlGenerator.class.php
new file mode 100644
index 0000000..03c5b14
--- /dev/null
+++ b/UrlGenerator.class.php
@@ -0,0 +1,25 @@
+<?php
+class UrlGenerator {
+ public $baseurl;
+
+ function __construct($baseurl) {
+ $this->baseurl = $baseurl;
+ }
+
+ function queue($queue) {
+ return $this->baseurl . "/queue/" . $queue;
+ }
+
+ function enqueue($queue) {
+ return $this->baseurl . "/enqueue/" . $queue;
+ }
+
+ function item($queue, $item) {
+ return $this->baseurl . "/item/" . $queue . "/" . $id;
+ }
+
+ function item_enclosure($queue, $id) {
+ return $this->baseurl . "/item/" . $queue . "/" . $id . "?enclosure";
+ }
+}
+?>
diff --git a/db.php b/db.php
new file mode 100644
index 0000000..8cd8bc9
--- /dev/null
+++ b/db.php
@@ -0,0 +1,124 @@
+<?php
+
+class Queue {
+ public $name;
+ public $endpoint_url;
+
+ function toPlainText($url_generator) {
+ return "Name: " . $this->name . "\n" .
+ "Endpoint URL: " . (isset($this->endpoint_url) ? $this->endpoint_url : "Not set") . "\n" .
+ "Enqueue: " . $url_generator->enqueue($this->name) . "\n";
+ }
+
+ static function valid_name($str) {
+ return preg_match('/^[a-zA-Z0-9]+$/', $str);
+ }
+}
+
+function item_from_db($id, $queue_name, $created, $headers) {
+ return new Item($id, $queue_name, $created, json_decode($headers));
+}
+
+class Item {
+ public $id;
+ public $queue_name;
+ public $created;
+ public $headers;
+
+ function __construct($id, $queue_name, $created, $headers) {
+ $this->id = $id;
+ $this->queue_name = $queue_name;
+ $this->created = $created;
+ $this->headers = $headers;
+ }
+
+ function toPlainText($url_generator) {
+ $headers = "";
+ foreach($this->headers as $k => $v) {
+ $headers = $headers . " " . $k . ": " . $v . "\n";
+ }
+ return "Headers:\n" . $headers;
+ }
+}
+
+class DB {
+ var $db;
+
+ function __construct() {
+ $this->db = new PDO("sqlite:/tmp/mq/mq.db");
+ $this->db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
+ $this->db->exec("PRAGMA foreign_keys = ON;");
+ $this->db->beginTransaction();
+ }
+
+ function commit() {
+ $this->db->commit();
+ }
+
+ function delete_queue_and_items($name) {
+ $stmt = $this->db->prepare("DELETE FROM item WHERE queue_name = :name");
+ $stmt->bindParam(":name", $name);
+ $stmt->execute();
+
+ $stmt = $this->db->prepare("DELETE FROM queue WHERE name = :name");
+ $stmt->bindParam(":name", $name);
+ $stmt->execute();
+ }
+
+ function insert_queue($name, $endpoint_url) {
+ $stmt = $this->db->prepare("INSERT INTO queue(name, endpoint_url) VALUES(:name, :endpoint_url)");
+ $stmt->bindParam(":name", $name);
+ $stmt->bindParam(":endpoint_url", $endpoint_url);
+ $stmt->execute();
+ }
+
+ function update_queue($name, $endpoint_url) {
+ $stmt = $this->db->prepare("UPDATE queue SET endpoint_url = :endpoint_url WHERE name = :name");
+ $stmt->bindParam(":endpoint_url", $endpoint_url);
+ $stmt->bindParam(":name", $name);
+ $stmt->execute();
+ }
+
+ function insert_item($id, array $headers, array $queues) {
+ foreach($queues as $queue) {
+ $stmt = $this->db->prepare("INSERT INTO item(id, queue_name, created, headers) VALUES(:id, :queue_name, datetime('now'), :headers)");
+ $stmt->bindParam(":id", $id);
+ $stmt->bindParam(":queue_name", $queue);
+ $stmt->bindParam(":headers", json_encode($headers));
+ $stmt->execute();
+ }
+ }
+
+ function select_queues() {
+ $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue ORDER BY name");
+ $stmt->execute();
+ return $stmt->fetchAll(PDO::FETCH_CLASS, "Queue");
+ }
+
+ function select_queue($queue) {
+ $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue WHERE name = :name");
+ $stmt->setFetchMode(PDO::FETCH_CLASS, "Queue");
+ $stmt->bindParam(":name", $queue);
+ $stmt->execute();
+ return $stmt->fetch(PDO::FETCH_CLASS);
+ }
+
+ function select_item($queue_name, $id) {
+ $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name AND id = :id");
+ $stmt->bindParam(":queue_name", $queue_name);
+ $stmt->bindParam(":id", $id);
+ $stmt->execute();
+ return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db');
+ }
+
+ function select_items($queue_name, $limit = 10, $offset = 0) {
+ $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name LIMIT :limit OFFSET :offset");
+ $stmt->bindParam(":queue_name", $queue_name);
+ $stmt->bindParam(":limit", $limit);
+ $stmt->bindParam(":offset", $offset);
+ $stmt->execute();
+ return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db');
+ }
+}
+
+?>
diff --git a/endpoint.php b/endpoint.php
new file mode 100644
index 0000000..e133c90
--- /dev/null
+++ b/endpoint.php
@@ -0,0 +1,36 @@
+<?php
+$id = $_SERVER["PATH_INFO"];
+$match = preg_match('/^\/[0-9a-zA-Z]+$/', $id);
+if($match == 0 || $match == FALSE) {
+ header("HTTP/1.1 500 Invalid/missing parameter 'id'.");
+ exit(0);
+}
+
+$id = substr($id, 1);
+
+$filename = "/tmp/mq/endpoint." . $id . ".log";
+
+$fd = fopen($filename, "a");
+fwrite($fd, date(DateTime::ATOM));
+fwrite($fd, "\n");
+foreach($_SERVER as $key => $value) {
+ if(strpos($key, "HTTP_") === 0) {
+ $key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5)))));
+ fwrite($fd, $key . ": ". $value . "\n");
+ }
+}
+$content_type = $_SERVER["CONTENT_TYPE"];
+if(isset($content_type)) {
+ fwrite($fd, "Content-Type: ". $content_type . "\n");
+}
+
+$content_length = $_SERVER["CONTENT_LENGTH"];
+if(isset($content_length)) {
+ fwrite($fd, "Content-Length: ". $content_length . "\n");
+}
+fwrite($fd, "\n");
+fflush($fd);
+fclose($fd);
+
+header("HTTP/1.1 200 Stored message for endpoint '" . $id . "'.");
+?>
diff --git a/mq.sql b/mq.sql
new file mode 100644
index 0000000..80b8744
--- /dev/null
+++ b/mq.sql
@@ -0,0 +1,24 @@
+PRAGMA foreign_keys = ON;
+
+DROP TABLE item;
+DROP TABLE queue;
+
+CREATE TABLE queue(
+ name VARCHAR(100) PRIMARY KEY,
+ endpoint_url VARCHAR(200)
+);
+
+CREATE TABLE item(
+ id CHAR(13),
+ queue_name VARCHAR(100) NOT NULL,
+ created INTEGER NOT NULL,
+ headers CLOB NOT NULL,
+ PRIMARY KEY (id, queue_name),
+ FOREIGN KEY(queue_name) REFERENCES queue(name)
+);
+
+INSERT INTO queue VALUES('a', NULL);
+INSERT INTO queue VALUES('github', NULL);
+INSERT INTO queue VALUES('b', "http://trygvis.dyndns.org/~trygvis/2012/02/http-mq/endpoint.php");
+-- INSERT INTO item VALUES('1', 'a', datetime('now'));
+-- INSERT INTO item VALUES('1', 'b', datetime('now'));
diff --git a/queue.php b/queue.php
new file mode 100644
index 0000000..26d0f30
--- /dev/null
+++ b/queue.php
@@ -0,0 +1,346 @@
+<?php
+include_once "db.php";
+include_once "UrlGenerator.class.php";
+
+header("Content-Type: text/plain");
+
+function tmp_file($id) {
+ return "/tmp/mq/tmp/$id";
+}
+
+function data_file($id) {
+ return "/tmp/mq/data/$id";
+}
+
+/*
+var_dump($_SERVER);
+*/
+$method = $_SERVER["REQUEST_METHOD"];
+$content_type = isset($_SERVER["CONTENT_TYPE"]) ? $_SERVER["CONTENT_TYPE"] : NULL;
+$db = new DB();
+
+$form = array();
+
+if(isset($content_type) && $content_type == "application/x-www-form-urlencoded") {
+ $body = file_get_contents("php://input");
+ $x = urldecode($body);
+ $parts = split("&", $x);
+ foreach($parts as $part) {
+ list($key, $value) = split("=", $part);
+ $form[$key] = $value;
+ }
+ var_dump($form);
+}
+
+$path_info = $_SERVER["PATH_INFO"];
+if(!isset($path_info)) {
+ header("HTTP/1.1 302");
+ header("Location: " . $_SERVER["PHP_SELF"] . "/status");
+}
+else {
+ $path_info = preg_replace('/[\/]$/', '', $path_info);
+ $path_info = preg_replace('/^[\/]/', '', $path_info);
+ $segments = explode("/", $path_info);
+ $baseurl = "http://" . $_SERVER["SERVER_NAME"] . ":" . $_SERVER["SERVER_PORT"] . $_SERVER["SCRIPT_NAME"];
+ $url_generator = new UrlGenerator($baseurl);
+ switch($segments[0]) {
+ case "enqueue":
+ allow_methods("POST");
+ if(count($segments) != 2) {
+ not_found();
+ }
+ enqueue($url_generator, $segments[1]);
+ break;
+ case "item":
+ allow_methods("GET");
+ if(count($segments) != 3) {
+ not_found();
+ }
+ item($url_generator, $segments[1], $segments[2]);
+ break;
+ case "queue":
+ switch(count($segments)) {
+ case 1:
+ queue($url_generator);
+ break;
+ case 2:
+ queues($url_generator, $segments[1]);
+ break;
+ default:
+ not_found();
+ }
+ break;
+ case "cron":
+ cron();
+ break;
+ default:
+ not_found();
+ break;
+ }
+}
+
+$db->commit();
+$db = null;
+
+function allow_methods($methods) {
+ global $method;
+ if(is_string($methods)) {
+ if($method == $methods) {
+ return;
+ }
+ header("HTTP/1.1 405 Invalid method");
+ header("Allow: " . $methods);
+ exit(0);
+ }
+}
+
+function unsupported_media_type() {
+ header("HTTP/1.1 415 Unsupported Media Type");
+ exit(0);
+}
+
+function conflict($short) {
+ header("HTTP/1.1 409 " . $short);
+ exit(0);
+}
+
+function not_found($short = NULL, $long = NULL) {
+ if(!isset($short)) {
+ $short = "Not found";
+ }
+ header("HTTP/1.1 404 " . $short);
+ if(isset($long)) {
+ echo($long);
+ }
+ exit(0);
+}
+
+function bad_request($short = NULL, $long = NULL) {
+ if(!isset($short)) {
+ $short = "Bad Request";
+ }
+ header("HTTP/1.1 400 " . $short);
+ if(isset($long)) {
+ echo($long);
+ }
+ exit(0);
+}
+
+function cron() {
+ global $db;
+
+ allow_methods("POST");
+
+ $queues = $db->select_queues();
+ foreach($queues as $queue) {
+ if(!isset($queue->endpoint_url)) {
+ echo("No endpoint configured for queue " . $queue->name . "\n");
+ continue;
+ }
+ $items = $db->select_items($queue->name, 1, 0);
+ $count = count($items);
+
+ if($count > 0) {
+ $item = $items[0];
+ process_item($queue, $items[0]);
+ break;
+ }
+
+ echo("Queue " . $queue->name . " is empty.\n");
+ }
+}
+
+function process_item($queue, $item) {
+ echo("Processing item " . $item->id . " from queue " . $queue->name . "\n");
+ $handle = curl_init();
+
+ // http://no2.php.net/manual/en/function.curl-setopt.php
+
+ // Set referer on redirecet
+ curl_setopt($handle, CURLOPT_AUTOREFERER, TRUE);
+ // Follow redirects
+ curl_setopt($handle, CURLOPT_FOLLOWLOCATION, TRUE);
+ // We're doing POSTs only.
+ curl_setopt($handle, CURLOPT_POST, TRUE);
+ curl_setopt($handle, CURLOPT_UPLOAD, TRUE);
+ curl_setopt($handle, CURLOPT_MAXREDIRS, 10);
+
+ curl_setopt($handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
+ // This should be 2 in production
+ curl_setopt($handle, CURLOPT_SSL_VERIFYHOST, 1);
+
+ // Return the transfer as a string of the return value of curl_exec()
+ curl_setopt($handle, CURLOPT_RETURNTRANSFER, true);
+
+ // In seconds
+ curl_setopt($handle, CURLOPT_TIMEOUT, 30);
+
+ curl_setopt($handle, CURLOPT_URL, $queue->endpoint_url);
+ var_dump($item->headers);
+ curl_setopt($handle, CURLOPT_HEADER, $item->headers);
+
+ curl_setopt($handle, CURLOPT_INFILE, data_file($item->id));
+
+ $ret = curl_exec($handle);
+ if($ret === false) {
+ echo("curl failed");
+ echo("error: " . curl_error($handle));
+ }
+ else {
+ echo("curl success");
+ echo($ret);
+ }
+
+ curl_close($handle);
+}
+
+function enqueue($url_generator, $queue) {
+ global $db;
+
+ $id = uniqid();
+ $tmp_file = tmp_file($id);
+ $data_file = data_file($id);
+
+ try {
+ $stdin = fopen('php://input', 'r');
+ file_put_contents($tmp_file, $stdin);
+ link($tmp_file, $data_file);
+ unlink($tmp_file);
+
+ $headers = array();
+ foreach($_SERVER as $key => $value) {
+ if(strpos($key, "HTTP_") === 0)
+ $headers[strtolower(str_replace('_', '-', substr($key, 5)))] = $value;
+ }
+ $headers["content-type"] = $_SERVER["CONTENT_TYPE"];
+ $headers["content-length"] = $_SERVER["CONTENT_LENGTH"];
+ var_dump($headers);
+
+ $db->insert_item($id, $headers, array($queue));
+
+ header("HTTP/1.1 201 Created as " . $id);
+ header("Location: " . $url_generator->item($id));
+ } catch (Exception $e) {
+ header("HTTP/1.1 500 Error");
+ echo("Caught exception: " . $e->getMessage() . "\n");
+ }
+}
+
+function item($url_generator, $queue, $id) {
+ global $db;
+
+ $item = $db->select_item($queue, $id);
+// var_dump($item);
+ $item = $item[0];
+ header("Link: <" . $url_generator->item_enclosure($item->id) . ">;rel=enclosure");
+ echo("Showing: " . $item->id . "\n");
+ echo($item->toPlainText($url_generator));
+}
+
+function queue($url_generator) {
+ global $method, $content_type, $db;
+
+ switch($method) {
+ case "GET":
+ case "HEAD":
+ $queues = $db->select_queues();
+ $count = count($queues);
+
+ header("Content-Type: text/plain");
+ if($count == 1) {
+ echo("Have one queue");
+ }
+ else {
+ echo("Have " . $count . " queues");
+ }
+ echo("\n\n");
+ foreach($queues as $q) {
+ echo($q->toPlainText($url_generator) . "\n");
+ }
+ break;
+ case "POST":
+ $name = NULL;
+ $endpoint_url = NULL;
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ $name = uniqid();
+ $endpoint_url = $_POST["endpoint_url"];
+ $db->insert_queue($name, $endpoint_url);
+ header("HTTP/1.1 201 Queue created as " . $name);
+ header("Location: " . $url_generator->queue($name));
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ default:
+ allow_methods("GET", "HEAD");
+ }
+}
+
+function queues($url_generator, $queue) {
+ global $method, $content_type, $form, $db;
+
+ if(!Queue::valid_name($queue)) {
+ not_found("Invalid queue name");
+ }
+
+ switch($method) {
+ case "GET":
+ case "HEAD":
+ $q = $db->select_queue($queue);
+ if(!isset($q)) {
+ not_found("No such queue: " . $queue);
+ }
+
+ header("Content-Type: text/plain");
+ echo($q->toPlainText($url_generator));
+ break;
+ case "PUT":
+ $q = $db->select_queue($queue);
+ if($q != NULL) {
+ conflict("Queue already exist: " . $queue);
+ }
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ $name = $queue;
+ $endpoint_url = $_POST["endpoint_url"];
+ $db->insert_queue($name, $endpoint_url);
+ header("HTTP/1.1 201 Queue created as " . $name);
+ header("Location: " . $url_generator->queue($name));
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ case "PATCH":
+ $q = $db->select_queue($queue);
+ if($q == NULL) {
+ not_found("No such queue: " . $queue);
+ }
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ var_dump($form);
+ $endpoint_url = $form["endpoint_url"];
+ if(!isset($endpoint_url)) {
+ header("HTTP/1.1 400 Bad request; missing endpoint_url");
+ exit(0);
+ }
+ else {
+ $db->update_queue($queue, $endpoint_url);
+ header("HTTP/1.1 204 Queue updated " . $queue);
+ header("Content-Location: " . $url_generator->queue($queue));
+ }
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ case "DELETE":
+ $db->delete_queue_and_items($queue);
+ break;
+ default:
+ allow_methods("GET", "HEAD", "DELETE");
+ }
+}
+?>