summaryrefslogtreecommitdiff
path: root/queue.php
diff options
context:
space:
mode:
Diffstat (limited to 'queue.php')
-rw-r--r--queue.php346
1 files changed, 346 insertions, 0 deletions
diff --git a/queue.php b/queue.php
new file mode 100644
index 0000000..26d0f30
--- /dev/null
+++ b/queue.php
@@ -0,0 +1,346 @@
+<?php
+include_once "db.php";
+include_once "UrlGenerator.class.php";
+
+header("Content-Type: text/plain");
+
+function tmp_file($id) {
+ return "/tmp/mq/tmp/$id";
+}
+
+function data_file($id) {
+ return "/tmp/mq/data/$id";
+}
+
+/*
+var_dump($_SERVER);
+*/
+$method = $_SERVER["REQUEST_METHOD"];
+$content_type = isset($_SERVER["CONTENT_TYPE"]) ? $_SERVER["CONTENT_TYPE"] : NULL;
+$db = new DB();
+
+$form = array();
+
+if(isset($content_type) && $content_type == "application/x-www-form-urlencoded") {
+ $body = file_get_contents("php://input");
+ $x = urldecode($body);
+ $parts = split("&", $x);
+ foreach($parts as $part) {
+ list($key, $value) = split("=", $part);
+ $form[$key] = $value;
+ }
+ var_dump($form);
+}
+
+$path_info = $_SERVER["PATH_INFO"];
+if(!isset($path_info)) {
+ header("HTTP/1.1 302");
+ header("Location: " . $_SERVER["PHP_SELF"] . "/status");
+}
+else {
+ $path_info = preg_replace('/[\/]$/', '', $path_info);
+ $path_info = preg_replace('/^[\/]/', '', $path_info);
+ $segments = explode("/", $path_info);
+ $baseurl = "http://" . $_SERVER["SERVER_NAME"] . ":" . $_SERVER["SERVER_PORT"] . $_SERVER["SCRIPT_NAME"];
+ $url_generator = new UrlGenerator($baseurl);
+ switch($segments[0]) {
+ case "enqueue":
+ allow_methods("POST");
+ if(count($segments) != 2) {
+ not_found();
+ }
+ enqueue($url_generator, $segments[1]);
+ break;
+ case "item":
+ allow_methods("GET");
+ if(count($segments) != 3) {
+ not_found();
+ }
+ item($url_generator, $segments[1], $segments[2]);
+ break;
+ case "queue":
+ switch(count($segments)) {
+ case 1:
+ queue($url_generator);
+ break;
+ case 2:
+ queues($url_generator, $segments[1]);
+ break;
+ default:
+ not_found();
+ }
+ break;
+ case "cron":
+ cron();
+ break;
+ default:
+ not_found();
+ break;
+ }
+}
+
+$db->commit();
+$db = null;
+
+function allow_methods($methods) {
+ global $method;
+ if(is_string($methods)) {
+ if($method == $methods) {
+ return;
+ }
+ header("HTTP/1.1 405 Invalid method");
+ header("Allow: " . $methods);
+ exit(0);
+ }
+}
+
+function unsupported_media_type() {
+ header("HTTP/1.1 415 Unsupported Media Type");
+ exit(0);
+}
+
+function conflict($short) {
+ header("HTTP/1.1 409 " . $short);
+ exit(0);
+}
+
+function not_found($short = NULL, $long = NULL) {
+ if(!isset($short)) {
+ $short = "Not found";
+ }
+ header("HTTP/1.1 404 " . $short);
+ if(isset($long)) {
+ echo($long);
+ }
+ exit(0);
+}
+
+function bad_request($short = NULL, $long = NULL) {
+ if(!isset($short)) {
+ $short = "Bad Request";
+ }
+ header("HTTP/1.1 400 " . $short);
+ if(isset($long)) {
+ echo($long);
+ }
+ exit(0);
+}
+
+function cron() {
+ global $db;
+
+ allow_methods("POST");
+
+ $queues = $db->select_queues();
+ foreach($queues as $queue) {
+ if(!isset($queue->endpoint_url)) {
+ echo("No endpoint configured for queue " . $queue->name . "\n");
+ continue;
+ }
+ $items = $db->select_items($queue->name, 1, 0);
+ $count = count($items);
+
+ if($count > 0) {
+ $item = $items[0];
+ process_item($queue, $items[0]);
+ break;
+ }
+
+ echo("Queue " . $queue->name . " is empty.\n");
+ }
+}
+
+function process_item($queue, $item) {
+ echo("Processing item " . $item->id . " from queue " . $queue->name . "\n");
+ $handle = curl_init();
+
+ // http://no2.php.net/manual/en/function.curl-setopt.php
+
+ // Set referer on redirecet
+ curl_setopt($handle, CURLOPT_AUTOREFERER, TRUE);
+ // Follow redirects
+ curl_setopt($handle, CURLOPT_FOLLOWLOCATION, TRUE);
+ // We're doing POSTs only.
+ curl_setopt($handle, CURLOPT_POST, TRUE);
+ curl_setopt($handle, CURLOPT_UPLOAD, TRUE);
+ curl_setopt($handle, CURLOPT_MAXREDIRS, 10);
+
+ curl_setopt($handle, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
+ // This should be 2 in production
+ curl_setopt($handle, CURLOPT_SSL_VERIFYHOST, 1);
+
+ // Return the transfer as a string of the return value of curl_exec()
+ curl_setopt($handle, CURLOPT_RETURNTRANSFER, true);
+
+ // In seconds
+ curl_setopt($handle, CURLOPT_TIMEOUT, 30);
+
+ curl_setopt($handle, CURLOPT_URL, $queue->endpoint_url);
+ var_dump($item->headers);
+ curl_setopt($handle, CURLOPT_HEADER, $item->headers);
+
+ curl_setopt($handle, CURLOPT_INFILE, data_file($item->id));
+
+ $ret = curl_exec($handle);
+ if($ret === false) {
+ echo("curl failed");
+ echo("error: " . curl_error($handle));
+ }
+ else {
+ echo("curl success");
+ echo($ret);
+ }
+
+ curl_close($handle);
+}
+
+function enqueue($url_generator, $queue) {
+ global $db;
+
+ $id = uniqid();
+ $tmp_file = tmp_file($id);
+ $data_file = data_file($id);
+
+ try {
+ $stdin = fopen('php://input', 'r');
+ file_put_contents($tmp_file, $stdin);
+ link($tmp_file, $data_file);
+ unlink($tmp_file);
+
+ $headers = array();
+ foreach($_SERVER as $key => $value) {
+ if(strpos($key, "HTTP_") === 0)
+ $headers[strtolower(str_replace('_', '-', substr($key, 5)))] = $value;
+ }
+ $headers["content-type"] = $_SERVER["CONTENT_TYPE"];
+ $headers["content-length"] = $_SERVER["CONTENT_LENGTH"];
+ var_dump($headers);
+
+ $db->insert_item($id, $headers, array($queue));
+
+ header("HTTP/1.1 201 Created as " . $id);
+ header("Location: " . $url_generator->item($id));
+ } catch (Exception $e) {
+ header("HTTP/1.1 500 Error");
+ echo("Caught exception: " . $e->getMessage() . "\n");
+ }
+}
+
+function item($url_generator, $queue, $id) {
+ global $db;
+
+ $item = $db->select_item($queue, $id);
+// var_dump($item);
+ $item = $item[0];
+ header("Link: <" . $url_generator->item_enclosure($item->id) . ">;rel=enclosure");
+ echo("Showing: " . $item->id . "\n");
+ echo($item->toPlainText($url_generator));
+}
+
+function queue($url_generator) {
+ global $method, $content_type, $db;
+
+ switch($method) {
+ case "GET":
+ case "HEAD":
+ $queues = $db->select_queues();
+ $count = count($queues);
+
+ header("Content-Type: text/plain");
+ if($count == 1) {
+ echo("Have one queue");
+ }
+ else {
+ echo("Have " . $count . " queues");
+ }
+ echo("\n\n");
+ foreach($queues as $q) {
+ echo($q->toPlainText($url_generator) . "\n");
+ }
+ break;
+ case "POST":
+ $name = NULL;
+ $endpoint_url = NULL;
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ $name = uniqid();
+ $endpoint_url = $_POST["endpoint_url"];
+ $db->insert_queue($name, $endpoint_url);
+ header("HTTP/1.1 201 Queue created as " . $name);
+ header("Location: " . $url_generator->queue($name));
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ default:
+ allow_methods("GET", "HEAD");
+ }
+}
+
+function queues($url_generator, $queue) {
+ global $method, $content_type, $form, $db;
+
+ if(!Queue::valid_name($queue)) {
+ not_found("Invalid queue name");
+ }
+
+ switch($method) {
+ case "GET":
+ case "HEAD":
+ $q = $db->select_queue($queue);
+ if(!isset($q)) {
+ not_found("No such queue: " . $queue);
+ }
+
+ header("Content-Type: text/plain");
+ echo($q->toPlainText($url_generator));
+ break;
+ case "PUT":
+ $q = $db->select_queue($queue);
+ if($q != NULL) {
+ conflict("Queue already exist: " . $queue);
+ }
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ $name = $queue;
+ $endpoint_url = $_POST["endpoint_url"];
+ $db->insert_queue($name, $endpoint_url);
+ header("HTTP/1.1 201 Queue created as " . $name);
+ header("Location: " . $url_generator->queue($name));
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ case "PATCH":
+ $q = $db->select_queue($queue);
+ if($q == NULL) {
+ not_found("No such queue: " . $queue);
+ }
+ switch($content_type) {
+ case "application/x-www-form-urlencoded":
+ var_dump($form);
+ $endpoint_url = $form["endpoint_url"];
+ if(!isset($endpoint_url)) {
+ header("HTTP/1.1 400 Bad request; missing endpoint_url");
+ exit(0);
+ }
+ else {
+ $db->update_queue($queue, $endpoint_url);
+ header("HTTP/1.1 204 Queue updated " . $queue);
+ header("Content-Location: " . $url_generator->queue($queue));
+ }
+ break;
+ default:
+ unsupported_media_type();
+ }
+ break;
+ case "DELETE":
+ $db->delete_queue_and_items($queue);
+ break;
+ default:
+ allow_methods("GET", "HEAD", "DELETE");
+ }
+}
+?>