name . "\n" . "Endpoint URL: " . (isset($this->endpoint_url) ? $this->endpoint_url : "Not set") . "\n" . "Enqueue: " . $url_generator->enqueue($this->name) . "\n"; } static function valid_name($str) { return preg_match('/^[a-zA-Z0-9]+$/', $str); } } function item_from_db($id, $queue_name, $created, $headers) { return new Item($id, $queue_name, $created, json_decode($headers)); } class Item { public $id; public $queue_name; public $created; public $headers; function __construct($id, $queue_name, $created, $headers) { $this->id = $id; $this->queue_name = $queue_name; $this->created = $created; $this->headers = $headers; } function toPlainText($url_generator) { $headers = ""; foreach($this->headers as $k => $v) { $headers = $headers . " " . $k . ": " . $v . "\n"; } return "Headers:\n" . $headers; } } class DB { var $db; function __construct() { $this->db = new PDO("sqlite:/tmp/mq/mq.db"); $this->db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $this->db->exec("PRAGMA foreign_keys = ON;"); $this->db->beginTransaction(); } function commit() { $this->db->commit(); } function delete_queue_and_items($name) { $stmt = $this->db->prepare("DELETE FROM item WHERE queue_name = :name"); $stmt->bindParam(":name", $name); $stmt->execute(); $stmt = $this->db->prepare("DELETE FROM queue WHERE name = :name"); $stmt->bindParam(":name", $name); $stmt->execute(); } function insert_queue($name, $endpoint_url) { $stmt = $this->db->prepare("INSERT INTO queue(name, endpoint_url) VALUES(:name, :endpoint_url)"); $stmt->bindParam(":name", $name); $stmt->bindParam(":endpoint_url", $endpoint_url); $stmt->execute(); } function update_queue($name, $endpoint_url) { $stmt = $this->db->prepare("UPDATE queue SET endpoint_url = :endpoint_url WHERE name = :name"); $stmt->bindParam(":endpoint_url", $endpoint_url); $stmt->bindParam(":name", $name); $stmt->execute(); } function insert_item($id, array $headers, array $queues) { foreach($queues as $queue) { $stmt = $this->db->prepare("INSERT INTO item(id, queue_name, created, headers) VALUES(:id, :queue_name, datetime('now'), :headers)"); $stmt->bindParam(":id", $id); $stmt->bindParam(":queue_name", $queue); $stmt->bindParam(":headers", json_encode($headers)); $stmt->execute(); } } function select_queues() { $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue ORDER BY name"); $stmt->execute(); return $stmt->fetchAll(PDO::FETCH_CLASS, "Queue"); } function select_queue($queue) { $stmt = $this->db->prepare("SELECT name, endpoint_url FROM queue WHERE name = :name"); $stmt->setFetchMode(PDO::FETCH_CLASS, "Queue"); $stmt->bindParam(":name", $queue); $stmt->execute(); return $stmt->fetch(PDO::FETCH_CLASS); } function select_item($queue_name, $id) { $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name AND id = :id"); $stmt->bindParam(":queue_name", $queue_name); $stmt->bindParam(":id", $id); $stmt->execute(); return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db'); } function select_items($queue_name, $limit = 10, $offset = 0) { $stmt = $this->db->prepare("SELECT id, queue_name, created, headers FROM item WHERE queue_name = :queue_name LIMIT :limit OFFSET :offset"); $stmt->bindParam(":queue_name", $queue_name); $stmt->bindParam(":limit", $limit); $stmt->bindParam(":offset", $offset); $stmt->execute(); return $stmt->fetchAll(PDO::FETCH_FUNC, 'item_from_db'); } } ?>