diff options
-rw-r--r-- | UrlGenerator.class.php | 25 | ||||
-rw-r--r-- | db.php | 124 | ||||
-rw-r--r-- | endpoint.php | 36 | ||||
-rw-r--r-- | mq.sql | 24 | ||||
-rw-r--r-- | queue.php | 346 |
5 files changed, 555 insertions, 0 deletions
diff --git a/UrlGenerator.class.php b/UrlGenerator.class.php new file mode 100644 index 0000000..03c5b14 --- /dev/null +++ b/UrlGenerator.class.php @@ -0,0 +1,25 @@ +<?php +class UrlGenerator { + public $baseurl; + + function __construct($baseurl) { + $this->baseurl = $baseurl; + } + + function queue($queue) { + return $this->baseurl . "/queue/" . $queue; + } + + function enqueue($queue) { + return $this->baseurl . "/enqueue/" . $queue; + } + + function item($queue, $item) { + return $this->baseurl . "/item/" . $queue . "/" . $id; + } + + function item_enclosure($queue, $id) { + return $this->baseurl . "/item/" . $queue . "/" . $id . "?enclosure"; + } +} +?> @@ -0,0 +1,124 @@ +<?php + +class Queue { + public $name; + public $endpoint_url; + + function toPlainText($url_generator) { + return "Name: " . $this->name . "\n" . + "Endpoint URL: " . (isset($this->endpoint_url) ? $this->endpoint_url : "Not set") . "\n" . + "Enqueue: " . $url_generator->enqueue($this->name) . "\n"; + } + + static function valid_name($str) { + return preg_match('/^[a-zA-Z0-9]+$/', $str); + } +} + +function item_from_db($id, $queue_name, $created, $headers) { + return new Item($id, $queue_name, $created, json_decode($headers)); +} + +class Item { + public $id; + public $queue_name; + public $created; + public $headers; + + function __construct($id, $queue_name, $created, $headers) { + $this->id = $id; + $this->queue_name = $queue_name; + $this->created = $created; + $this->headers = $headers; + } + + function toPlainText($url_generator) { + $headers = ""; + foreach($this->headers as $k => $v) { + $headers = $headers . " " . $k . ": " . $v . "\n"; + } + return "Headers:\n" . $headers; + } +} + +class DB { + var $db; + + function __construct() { + $this->db = new PDO("sqlite:/tmp/mq/mq.db"); + $this->db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); + $this->db->exec("PRAGMA foreign_keys = ON;"); + $this->db->beginTransaction(); + } + + function commit() { + $this->db->commit(); + } + + function delete_queue_and_items($name) { + $stmt = $this->db->prepare("DELETE FROM item WHERE queue_name = :name"); + $stmt->bindParam(":name", $name); + $stmt->execute(); + + $stmt = $this->db->prepare("DELETE FROM queue WHERE name = :name"); + $stmt->bindParam(":name", $name); + $stmt->execute(); + } + + function insert_queue($name, $endpoint_url) { + $stmt = $this->db->prepare("INSERT INTO queue(name, endpoint_url) VALUES(:name, :endpoint_url)"); + $stmt->bindParam(":name", $name); + $stmt->bindParam(":endpoint_url", $endpoint_url); + $stmt->execute(); + } + + function update_queue($name, $endpoint_url) { + $stmt = $this->db->prepare("UPDATE queue SET endpoint_url = :endpoint_url WHERE name = :name"); + $stmt->bindParam(":endpoint_url", $endpoint_url); + $stmt->bindParam(":name", $name); + $stmt->execute(); + } + + function insert_item($id, array $headers, array $queues) { + foreach($queues as $queue) { + $stmt = $this->db->prepare("INSERT INTO item(id, queue_name, created, headers) VALUES(:id, :queue_name, datetime('now'), :headers)"); + $stmt->bindParam(":id", $id); + $stmt->bindParam(":queue_name", $queue); + $stmt->bindParam(":headers", json_encode($headers)); + $stmt->execute(); + } + } + + function select_queues() { + $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue ORDER BY name"); + $stmt->execute(); + return $stmt->fetchAll(PDO::FETCH_CLASS, "Queue"); + } + + function select_queue($queue) { + $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue WHERE name = :name"); + $stmt->setFetchMode(PDO::FETCH_CLASS, "Queue"); + $stmt->bindParam(":name", $queue); + $stmt->execute(); + return $stmt->fetch(PDO::FETCH_CLASS); + } + + function select_item($queue_name, $id) { + $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name AND id = :id"); + $stmt->bindParam(":queue_name", $queue_name); + $stmt->bindParam(":id", $id); + $stmt->execute(); + return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db'); + } + + function select_items($queue_name, $limit = 10, $offset = 0) { + $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name LIMIT :limit OFFSET :offset"); + $stmt->bindParam(":queue_name", $queue_name); + $stmt->bindParam(":limit", $limit); + $stmt->bindParam(":offset", $offset); + $stmt->execute(); + return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db'); + } +} + +?> diff --git a/endpoint.php b/endpoint.php new file mode 100644 index 0000000..e133c90 --- /dev/null +++ b/endpoint.php @@ -0,0 +1,36 @@ +<?php +$id = $_SERVER["PATH_INFO"]; +$match = preg_match('/^\/[0-9a-zA-Z]+$/', $id); +if($match == 0 || $match == FALSE) { + header("HTTP/1.1 500 Invalid/missing parameter 'id'."); + exit(0); +} + +$id = substr($id, 1); + +$filename = "/tmp/mq/endpoint." . $id . ".log"; + +$fd = fopen($filename, "a"); +fwrite($fd, date(DateTime::ATOM)); +fwrite($fd, "\n"); +foreach($_SERVER as $key => $value) { + if(strpos($key, "HTTP_") === 0) { + $key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5))))); + fwrite($fd, $key . ": ". $value . "\n"); + } +} +$content_type = $_SERVER["CONTENT_TYPE"]; +if(isset($content_type)) { + fwrite($fd, "Content-Type: ". $content_type . "\n"); +} + +$content_length = $_SERVER["CONTENT_LENGTH"]; +if(isset($content_length)) { + fwrite($fd, "Content-Length: ". $content_length . "\n"); +} +fwrite($fd, "\n"); +fflush($fd); +fclose($fd); + +header("HTTP/1.1 200 Stored message for endpoint '" . $id . "'."); +?> @@ -0,0 +1,24 @@ +PRAGMA foreign_keys = ON; + +DROP TABLE item; +DROP TABLE queue; + +CREATE TABLE queue( + name VARCHAR(100) PRIMARY KEY, + endpoint_url VARCHAR(200) +); + +CREATE TABLE item( + id CHAR(13), + queue_name VARCHAR(100) NOT NULL, + created INTEGER NOT NULL, + headers CLOB NOT NULL, + PRIMARY KEY (id, queue_name), + FOREIGN KEY(queue_name) REFERENCES queue(name) +); + +INSERT INTO queue VALUES('a', NULL); +INSERT INTO queue VALUES('github', NULL); +INSERT INTO queue VALUES('b', "http://trygvis.dyndns.org/~trygvis/2012/02/http-mq/endpoint.php"); +-- INSERT INTO item VALUES('1', 'a', datetime('now')); +-- INSERT INTO item VALUES('1', 'b', datetime('now')); diff --git a/queue.php b/queue.php new file mode 100644 index 0000000..26d0f30 --- /dev/null +++ b/queue.php @@ -0,0 +1,346 @@ +<?php +include_once "db.php"; +include_once "UrlGenerator.class.php"; + +header("Content-Type: text/plain"); + +function tmp_file($id) { + return "/tmp/mq/tmp/$id"; +} + +function data_file($id) { + return "/tmp/mq/data/$id"; +} + +/* +var_dump($_SERVER); +*/ +$method = $_SERVER["REQUEST_METHOD"]; +$content_type = isset($_SERVER["CONTENT_TYPE"]) ? $_SERVER["CONTENT_TYPE"] : NULL; +$db = new DB(); + +$form = array(); + +if(isset($content_type) && $content_type == "application/x-www-form-urlencoded") { + $body = file_get_contents("php://input"); + $x = urldecode($body); + $parts = split("&", $x); + foreach($parts as $part) { + list($key, $value) = split("=", $part); + $form[$key] = $value; + } + var_dump($form); +} + +$path_info = $_SERVER["PATH_INFO"]; +if(!isset($path_info)) { + header("HTTP/1.1 302"); + header("Location: " . $_SERVER["PHP_SELF"] . "/status"); +} +else { + $path_info = preg_replace('/[\/]$/', '', $path_info); + $path_info = preg_replace('/^[\/]/', '', $path_info); + $segments = explode("/", $path_info); + $baseurl = "http://" . $_SERVER["SERVER_NAME"] . ":" . $_SERVER["SERVER_PORT"] . $_SERVER["SCRIPT_NAME"]; + $url_generator = new UrlGenerator($baseurl); + switch($segments[0]) { + case "enqueue": + allow_methods("POST"); + if(count($segments) != 2) { + not_found(); + } + enqueue($url_generator, $segments[1]); + break; + case "item": + allow_methods("GET"); + if(count($segments) != 3) { + not_found(); + } + item($url_generator, $segments[1], $segments[2]); + break; + case "queue": + switch(count($segments)) { + case 1: + queue($url_generator); + break; + case 2: + queues($url_generator, $segments[1]); + break; + default: + not_found(); + } + break; + case "cron": + cron(); + break; + default: + not_found(); + break; + } +} + +$db->commit(); +$db = null; + +function allow_methods($methods) { + global $method; + if(is_string($methods)) { + if($method == $methods) { + return; + } + header("HTTP/1.1 405 Invalid method"); + header("Allow: " . $methods); + exit(0); + } +} + +function unsupported_media_type() { + header("HTTP/1.1 415 Unsupported Media Type"); + exit(0); +} + +function conflict($short) { + header("HTTP/1.1 409 " . $short); + exit(0); +} + +function not_found($short = NULL, $long = NULL) { + if(!isset($short)) { + $short = "Not found"; + } + header("HTTP/1.1 404 " . $short); + if(isset($long)) { + echo($long); + } + exit(0); +} + +function bad_request($short = NULL, $long = NULL) { + if(!isset($short)) { + $short = "Bad Request"; + } + header("HTTP/1.1 400 " . $short); + if(isset($long)) { + echo($long); + } + exit(0); +} + +function cron() { + global $db; + + allow_methods("POST"); + + $queues = $db->select_queues(); + foreach($queues as $queue) { + if(!isset($queue->endpoint_url)) { + echo("No endpoint configured for queue " . $queue->name . "\n"); + continue; + } + $items = $db->select_items($queue->name, 1, 0); + $count = count($items); + + if($count > 0) { + $item = $items[0]; + process_item($queue, $items[0]); + break; + } + + echo("Queue " . $queue->name . " is empty.\n"); + } +} + +function process_item($queue, $item) { + echo("Processing item " . $item->id . " from queue " . $queue->name . "\n"); + $handle = curl_init(); + + // http://no2.php.net/manual/en/function.curl-setopt.php + + // Set referer on redirecet + curl_setopt($handle, CURLOPT_AUTOREFERER, TRUE); + // Follow redirects + curl_setopt($handle, CURLOPT_FOLLOWLOCATION, TRUE); + // We're doing POSTs only. + curl_setopt($handle, CURLOPT_POST, TRUE); + curl_setopt($handle, CURLOPT_UPLOAD, TRUE); + curl_setopt($handle, CURLOPT_MAXREDIRS, 10); + + curl_setopt($handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS); + // This should be 2 in production + curl_setopt($handle, CURLOPT_SSL_VERIFYHOST, 1); + + // Return the transfer as a string of the return value of curl_exec() + curl_setopt($handle, CURLOPT_RETURNTRANSFER, true); + + // In seconds + curl_setopt($handle, CURLOPT_TIMEOUT, 30); + + curl_setopt($handle, CURLOPT_URL, $queue->endpoint_url); + var_dump($item->headers); + curl_setopt($handle, CURLOPT_HEADER, $item->headers); + + curl_setopt($handle, CURLOPT_INFILE, data_file($item->id)); + + $ret = curl_exec($handle); + if($ret === false) { + echo("curl failed"); + echo("error: " . curl_error($handle)); + } + else { + echo("curl success"); + echo($ret); + } + + curl_close($handle); +} + +function enqueue($url_generator, $queue) { + global $db; + + $id = uniqid(); + $tmp_file = tmp_file($id); + $data_file = data_file($id); + + try { + $stdin = fopen('php://input', 'r'); + file_put_contents($tmp_file, $stdin); + link($tmp_file, $data_file); + unlink($tmp_file); + + $headers = array(); + foreach($_SERVER as $key => $value) { + if(strpos($key, "HTTP_") === 0) + $headers[strtolower(str_replace('_', '-', substr($key, 5)))] = $value; + } + $headers["content-type"] = $_SERVER["CONTENT_TYPE"]; + $headers["content-length"] = $_SERVER["CONTENT_LENGTH"]; + var_dump($headers); + + $db->insert_item($id, $headers, array($queue)); + + header("HTTP/1.1 201 Created as " . $id); + header("Location: " . $url_generator->item($id)); + } catch (Exception $e) { + header("HTTP/1.1 500 Error"); + echo("Caught exception: " . $e->getMessage() . "\n"); + } +} + +function item($url_generator, $queue, $id) { + global $db; + + $item = $db->select_item($queue, $id); +// var_dump($item); + $item = $item[0]; + header("Link: <" . $url_generator->item_enclosure($item->id) . ">;rel=enclosure"); + echo("Showing: " . $item->id . "\n"); + echo($item->toPlainText($url_generator)); +} + +function queue($url_generator) { + global $method, $content_type, $db; + + switch($method) { + case "GET": + case "HEAD": + $queues = $db->select_queues(); + $count = count($queues); + + header("Content-Type: text/plain"); + if($count == 1) { + echo("Have one queue"); + } + else { + echo("Have " . $count . " queues"); + } + echo("\n\n"); + foreach($queues as $q) { + echo($q->toPlainText($url_generator) . "\n"); + } + break; + case "POST": + $name = NULL; + $endpoint_url = NULL; + switch($content_type) { + case "application/x-www-form-urlencoded": + $name = uniqid(); + $endpoint_url = $_POST["endpoint_url"]; + $db->insert_queue($name, $endpoint_url); + header("HTTP/1.1 201 Queue created as " . $name); + header("Location: " . $url_generator->queue($name)); + break; + default: + unsupported_media_type(); + } + break; + default: + allow_methods("GET", "HEAD"); + } +} + +function queues($url_generator, $queue) { + global $method, $content_type, $form, $db; + + if(!Queue::valid_name($queue)) { + not_found("Invalid queue name"); + } + + switch($method) { + case "GET": + case "HEAD": + $q = $db->select_queue($queue); + if(!isset($q)) { + not_found("No such queue: " . $queue); + } + + header("Content-Type: text/plain"); + echo($q->toPlainText($url_generator)); + break; + case "PUT": + $q = $db->select_queue($queue); + if($q != NULL) { + conflict("Queue already exist: " . $queue); + } + switch($content_type) { + case "application/x-www-form-urlencoded": + $name = $queue; + $endpoint_url = $_POST["endpoint_url"]; + $db->insert_queue($name, $endpoint_url); + header("HTTP/1.1 201 Queue created as " . $name); + header("Location: " . $url_generator->queue($name)); + break; + default: + unsupported_media_type(); + } + break; + case "PATCH": + $q = $db->select_queue($queue); + if($q == NULL) { + not_found("No such queue: " . $queue); + } + switch($content_type) { + case "application/x-www-form-urlencoded": + var_dump($form); + $endpoint_url = $form["endpoint_url"]; + if(!isset($endpoint_url)) { + header("HTTP/1.1 400 Bad request; missing endpoint_url"); + exit(0); + } + else { + $db->update_queue($queue, $endpoint_url); + header("HTTP/1.1 204 Queue updated " . $queue); + header("Content-Location: " . $url_generator->queue($queue)); + } + break; + default: + unsupported_media_type(); + } + break; + case "DELETE": + $db->delete_queue_and_items($queue); + break; + default: + allow_methods("GET", "HEAD", "DELETE"); + } +} +?> |