aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-07-04 18:53:30 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-07-04 18:53:30 +0200
commit85b434ffc8870a592044b29686f6d2f352fe7c45 (patch)
treee3dc7f14437530f48ccb0d7d124f22e74be569fc
downloadpgsql-http-bgworker-85b434ffc8870a592044b29686f6d2f352fe7c45.tar.gz
pgsql-http-bgworker-85b434ffc8870a592044b29686f6d2f352fe7c45.tar.bz2
pgsql-http-bgworker-85b434ffc8870a592044b29686f6d2f352fe7c45.tar.xz
pgsql-http-bgworker-85b434ffc8870a592044b29686f6d2f352fe7c45.zip
o Initial import of my PostgreSQL background worker + HTTP experiment.
-rw-r--r--.gitignore3
-rw-r--r--.gitmodules5
-rw-r--r--Makefile30
-rw-r--r--README.md15
-rw-r--r--data.sql12
-rw-r--r--httpd.c560
m---------libebb0
7 files changed, 625 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f2a9932
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+data/
+*.so
+*.o
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..0e61cdf
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,5 @@
+[submodule "libebb"]
+ path = libebb
+ url = https://github.com/evanphx/libebb.git
+# Alternate URL for libebb which has a bug with EV_ERROR:
+# https://github.com/evanphx/libebb.git
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..3f82144
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,30 @@
+POSTGRESQL ?= $(HOME)/opt/postgresql-git
+
+PG_CPPFLAGS = -Imongoose -Ilibebb
+SHLIB_LINK = -lev
+
+MODULE_big = httpd
+OBJS=httpd.o libebb/libebb.a
+# mongoose/mongoose.o
+
+PG_CONFIG = $(POSTGRESQL)/bin/pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+
+httpd.o: httpd.c
+libebb/libebb.a:
+ $(MAKE) -C libebb LIBS=-lev
+
+# Utility targets
+
+reinitdb:
+ rm -rf data
+ $(MAKE) initdb
+
+initdb:
+ $(POSTGRESQL)/bin/initdb -D data -E utf8
+ echo "port=5433" >> data/postgresql.conf
+ echo "shared_preload_libraries='httpd'" >> data/postgresql.conf
+
+run:
+ $(POSTGRESQL)/bin/postgres -D data
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..fa3b02b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,15 @@
+This is me mainly scratching an itch, but it is a small proof of
+concept on how possible it is to embed a HTTP server into PostgreSQL
+as a background worker.
+
+Right now it uses libebb as an event-driven HTTP server. As the
+background workers only have a single connection to the database, one
+process can only serve one client at a time. Libebb support multiple
+connection, you better not connect multiple times.
+
+Other possible HTTP implementations:
+
+ * https://github.com/valenok/mongoose.git - tried it, got complicated
+ because it wanted to control forking.
+ * https://github.com/joyent/http-parser - a plain HTTP parser, might
+ be useful if I end up with a custom HTTP server.
diff --git a/data.sql b/data.sql
new file mode 100644
index 0000000..2c68069
--- /dev/null
+++ b/data.sql
@@ -0,0 +1,12 @@
+BEGIN;
+DROP TABLE IF EXISTS mytable;
+CREATE TABLE mytable (
+ id serial,
+ text varchar(100)
+);
+
+INSERT INTO mytable(text)
+SELECT md5(random()::text) FROM generate_series(1, 10)
+-- returning *
+;
+COMMIT;
diff --git a/httpd.c b/httpd.c
new file mode 100644
index 0000000..bc27936
--- /dev/null
+++ b/httpd.c
@@ -0,0 +1,560 @@
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <assert.h>
+
+///////////////////////////////////////
+
+#include "ebb.h"
+struct ev_loop *loop = NULL;
+
+// TODO: register a handler for new_buf that uses palloc
+// TODO: handle errors, figure out how to use ebb_request_parser_has_error
+
+///////////////////////////////////////
+
+#include "postgres.h"
+
+#include "utils/portal.h"
+
+/* These are always necessary for a bgworker */
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+/* these headers are used by this particular worker's code */
+#include "access/xact.h"
+#include "executor/spi.h"
+#include "fmgr.h"
+#include "lib/stringinfo.h"
+#include "pgstat.h"
+#include "utils/builtins.h"
+#include "utils/snapmgr.h"
+#include "tcop/utility.h"
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+/* flags set by signal handlers */
+static volatile sig_atomic_t got_sighup = false;
+static volatile sig_atomic_t got_sigterm = false;
+
+/* GUC variables */
+static int worker_spi_naptime = 10;
+
+/* If the plan should be read-only */
+bool read_only = false;
+
+struct {
+ char *path;
+ char *uri;
+ char *query_string;
+ char *query;
+ char *field, *value;
+ bool do_close;
+} http_state;
+
+struct {
+ SPIPlanPtr plan;
+ Portal portal;
+ bool is_cursor;
+} sql_state;
+
+static int start_query() {
+ int ret;
+
+ bzero(&sql_state, sizeof(sql_state));
+ elog(LOG, "start_query: %s", http_state.query);
+
+ /* Connect to the database */
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ ret = SPI_connect();
+ if(ret != SPI_OK_CONNECT) {
+ elog(FATAL, "Could not connect to server: %s", SPI_result_code_string(ret));
+ return 1;
+ }
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pgstat_report_activity(STATE_RUNNING, http_state.query);
+
+ elog(LOG, "SPI_prepare_cursor");
+ sql_state.plan = SPI_prepare_cursor(http_state.query,
+ 0, /* No arguments */
+ NULL, /* Empty array of arguments */
+ CURSOR_OPT_NO_SCROLL);
+ if(SPI_result != 0) {
+ elog(LOG, "SPI_prepare_cursor: SPI_result=%s", SPI_result_code_string(SPI_result));
+ return 1;
+ }
+
+ sql_state.is_cursor = SPI_is_cursor_plan(sql_state.plan);
+ if(sql_state.is_cursor) {
+ sql_state.portal = SPI_cursor_open(NULL,
+ sql_state.plan,
+ NULL, /* No argument values */
+ NULL, /* No arguments that are NULL */
+ read_only); /* This tx is read only */
+ if(SPI_result != 0) {
+ elog(LOG, "SPI_cursor_open: SPI_result=%s", SPI_result_code_string(SPI_result));
+ return 1;
+ }
+ elog(LOG, "SPI_cursor_open: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable);
+ } else {
+ ret = SPI_execute_plan(sql_state.plan, NULL, NULL, read_only, 0);
+ if(ret < 0) {
+ elog(LOG, "SPI_execute_plan: SPI_result=%s", SPI_result_code_string(SPI_result));
+ return 1;
+ }
+ elog(LOG, "SPI_execute_plan: SPI_processed=%d", SPI_processed);
+ }
+
+ return 0;
+}
+
+static void complete_query() {
+ elog(LOG, "complete_query");
+ if(sql_state.plan != NULL) {
+ SPI_freeplan(sql_state.plan);
+ }
+ SPI_finish();
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/***
+ * Web server handler
+ */
+
+/*
+#define MSG ("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nhello world\n")
+static int c = 0;
+
+struct hello_connection {
+ unsigned int responses_to_write;
+};
+*/
+
+static void on_close(ebb_connection *connection)
+{
+ elog(LOG, "on_close");
+ free(connection->data);
+ free(connection);
+}
+
+static void continue_responding(ebb_connection *connection)
+{
+ int i, j;
+ StringInfoData buf;
+ TupleDesc tupleDesc;
+
+ if(http_state.do_close) {
+ goto complete;
+ }
+
+ initStringInfo(&buf);
+
+ if(sql_state.is_cursor) {
+ SPI_cursor_fetch(sql_state.portal, true /* forward */, 3 /* count */);
+ elog(LOG, "SPI_cursor_fetch: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable);
+ if(SPI_result != 0) {
+ elog(FATAL, "SPI_cursor_fetch: SPI_result: %s", SPI_result_code_string(SPI_result));
+ goto complete;
+ }
+ }
+
+ elog(LOG, "SPI_cursor_fetch: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable);
+ if(SPI_processed == 0 || SPI_tuptable == NULL) {
+ goto complete;
+ }
+
+ tupleDesc = SPI_tuptable->tupdesc;
+
+ for(i = 0; i < SPI_processed; i++) {
+ for (j = 0; j < tupleDesc->natts; j++) {
+ char *value;
+ if (tupleDesc->attrs[j]->attisdropped) {
+ continue;
+ }
+
+ if(j > 0) {
+ appendStringInfoChar(&buf, ';');
+ }
+ value = SPI_getvalue(SPI_tuptable->vals[i], tupleDesc, j + 1);
+ switch(SPI_result) {
+ case 0:
+ if(value != NULL) {
+ appendStringInfoString(&buf, value);
+ }
+ break;
+ case SPI_ERROR_NOOUTFUNC:
+ // no function to handle the conversion to text
+ break;
+ default:
+ elog(LOG, "SPI_result=%s", SPI_result_code_string(SPI_result));
+ complete_query();
+ ebb_connection_schedule_close(connection);
+ }
+ if(value != NULL) {
+ pfree(value);
+ }
+ }
+ appendStringInfoChar(&buf, '\r');
+ appendStringInfoChar(&buf, '\n');
+ }
+
+ ebb_connection_write(connection, buf.data, buf.len, continue_responding);
+ return;
+
+complete:
+ complete_query();
+ ebb_connection_schedule_close(connection);
+}
+
+static void request_complete(ebb_request *request)
+{
+ ebb_connection *connection;
+ int i;
+
+ //printf("request complete \n");
+ elog(LOG, "request complete");
+ connection = request->data;
+ //connection_data = connection->data;
+
+ /*
+ if(ebb_request_should_keep_alive(request))
+ connection_data->responses_to_write++;
+ else
+ connection_data->responses_to_write = 1;
+ */
+
+ if(start_query()) {
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "500 Internal Server Error: %s", SPI_result_code_string(SPI_result));
+ appendStringInfoString(&buf, "Content-Length: 0");
+ http_state.do_close = true;
+ ebb_connection_write(connection, buf.data, buf.len, continue_responding);
+ }
+ else {
+ StringInfoData buf;
+ TupleDesc tupleDesc;
+ if(sql_state.is_cursor) {
+ tupleDesc = sql_state.portal->tupDesc;
+ }
+ else {
+ tupleDesc = SPI_tuptable->tupdesc;
+ }
+
+ initStringInfo(&buf);
+ for (i = 0; i < tupleDesc->natts; i++) {
+ if (tupleDesc->attrs[i]->attisdropped)
+ continue;
+
+ if (i > 0) {
+ appendStringInfoChar(&buf, ';');
+ }
+ appendStringInfoString(&buf, NameStr(tupleDesc->attrs[i]->attname));
+ }
+ appendStringInfoChar(&buf, '\r');
+ appendStringInfoChar(&buf, '\n');
+ ebb_connection_write(connection, buf.data, buf.len, continue_responding);
+ }
+ free(request);
+}
+
+int next_is_query = 0;
+
+static void on_header_field(ebb_request* req, const char *at, size_t length, int header_index) {
+ // elog(LOG, "on_header_field: at=%s, length=%zu, header_index=%d", at, length, header_index);
+ http_state.field = palloc(length + 1);
+ strncpy(http_state.field, at, length);
+ http_state.field[length] = '\0';
+ if(strncasecmp("query", http_state.field, length) == 0) {
+ next_is_query = 1;
+ }
+}
+
+static void on_header_value(ebb_request* req, const char *at, size_t length, int header_index) {
+ // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index);
+ http_state.value = palloc(length + 1);
+ strncpy(http_state.value, at, length);
+ http_state.value[length] = '\0';
+
+ if(next_is_query == 1) {
+ http_state.query = http_state.value;
+ }
+ elog(LOG, "Header: %s: %s", http_state.field, http_state.value);
+}
+
+static void on_path(ebb_request* req, const char *at, size_t length) {
+ // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index);
+ http_state.path = palloc(length + 1);
+ strncpy(http_state.path, at, length);
+ http_state.path[length] = '\0';
+
+ elog(LOG, "Path: %s", http_state.path);
+}
+
+static void on_uri(ebb_request* req, const char *at, size_t length) {
+ // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index);
+ http_state.uri = palloc(length + 1);
+ strncpy(http_state.uri, at, length);
+ http_state.uri[length] = '\0';
+
+ elog(LOG, "Uri: %s", http_state.uri);
+}
+
+static void on_query_string(ebb_request* req, const char *at, size_t length) {
+ // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index);
+ http_state.query_string = palloc(length + 1);
+ strncpy(http_state.query_string, at, length);
+ http_state.query_string[length] = '\0';
+
+ elog(LOG, "Query string: %s", http_state.query_string);
+}
+
+static ebb_request* new_request(ebb_connection *connection)
+{
+ ebb_request *request;
+
+ bzero(&http_state, sizeof(http_state));
+
+ //printf("request %d\n", ++c);
+ elog(LOG, "request");
+ request = malloc(sizeof(ebb_request));
+ ebb_request_init(request);
+ request->data = connection;
+ request->on_complete = request_complete;
+ request->on_path = on_path;
+ request->on_uri = on_uri;
+ request->on_query_string = on_query_string;
+ request->on_header_field = on_header_field;
+ request->on_header_value = on_header_value;
+ return request;
+}
+
+static ebb_connection* new_connection(ebb_server *server, struct sockaddr_in *addr)
+{
+ /*
+ struct hello_connection *connection_data = malloc(sizeof(struct hello_connection));
+ */
+ ebb_connection *connection;
+
+ /*
+ if(connection_data == NULL)
+ return NULL;
+ connection_data->responses_to_write = 0;
+ */
+
+ connection = malloc(sizeof(ebb_connection));
+ if(connection == NULL) {
+ //free(connection_data);
+ return NULL;
+ }
+
+ ebb_connection_init(connection);
+ // connection->data = connection_data;
+ connection->new_request = new_request;
+ connection->on_close = on_close;
+
+ //printf("connection: %d\n", c++);
+ elog(LOG, "connection");
+ return connection;
+}
+
+/***
+ * Postgresql background worker handlers
+ */
+
+static void worker_spi_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_sigterm = true;
+ elog(LOG, "httpd: got sigterm");
+ if (MyProc)
+ SetLatch(&MyProc->procLatch);
+ errno = save_errno;
+
+ if(loop)
+ ev_unloop(loop, EVUNLOOP_ALL);
+}
+
+static void worker_spi_sighup(SIGNAL_ARGS)
+{
+ got_sighup = true;
+ elog(LOG, "httpd: got sighup");
+ if (MyProc)
+ SetLatch(&MyProc->procLatch);
+
+ if(loop)
+ ev_unloop(loop, EVUNLOOP_ALL);
+}
+
+static void timer_callback(EV_P_ ev_timer *w, int revents) {
+// elog(LOG, "timer_callback");
+ if(loop)
+ ev_unloop(loop, EVUNLOOP_ALL);
+}
+
+static void worker_spi_main(void *main_arg)
+{
+// StringInfoData buf;
+ struct ev_timer timer;
+ ebb_server server;
+ int port = 8080;
+
+ loop = ev_default_loop(0);
+ ebb_server_init(&server, loop);
+ //ebb_server_set_secure(&server, "examples/ca-cert.pem", "examples/ca-key.pem");
+ server.new_connection = new_connection;
+ server.data = NULL;
+
+ elog(LOG, "httpd worker starting, pid = %d", getpid());
+ ebb_server_listen_on_port(&server, port);
+ elog(LOG, "httpd ready, listening on port %d", port);
+
+ BackgroundWorkerUnblockSignals();
+
+ BackgroundWorkerInitializeConnection("postgres", NULL);
+
+ // do_query();
+
+ ev_timer_init(&timer, timer_callback, 1, 1);
+ ev_timer_start(loop, &timer);
+
+ while (!got_sigterm)
+ {
+ int rc;
+
+// elog(LOG, "ev_loop: pre");
+ ev_loop(loop, 0);
+// elog(LOG, "ev_loop: post");
+
+// elog(LOG, "Waiting...");
+ /*
+ * Background workers mustn't call usleep() or any direct equivalent:
+ * instead, they may wait on their process latch, which sleeps as
+ * necessary, but is awakened if postmaster dies. That way the
+ * background process goes away immediately in an emergency.
+ */
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 100 /*worker_spi_naptime * 1000L*/);
+ ResetLatch(&MyProc->procLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+// elog(LOG, "Woke up");
+
+ /*
+ * In case of a SIGHUP, just reload the configuration.
+ */
+ if (got_sighup)
+ {
+ got_sighup = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /*
+ * Start a transaction on which we can run queries. Note that each
+ * StartTransactionCommand() call should be preceded by a
+ * SetCurrentStatementStartTimestamp() call, which sets both the time
+ * for the statement we're about the run, and also the transaction
+ * start time. Also, each other query sent to SPI should probably be
+ * preceded by SetCurrentStatementStartTimestamp(), so that statement
+ * start time is always up to date.
+ *
+ * The SPI_connect() call lets us run queries through the SPI manager,
+ * and the PushActiveSnapshot() call creates an "active" snapshot
+ * which is necessary for queries to have MVCC data to work on.
+ *
+ * The pgstat_report_activity() call makes our activity visible
+ * through the pgstat views.
+ */
+ /*
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ SPI_connect();
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pgstat_report_activity(STATE_RUNNING, buf.data);
+ */
+
+ /* We can now execute queries via SPI */
+/*
+ ret = SPI_execute(buf.data, false, 0);
+
+ if (ret != SPI_OK_UPDATE_RETURNING)
+ elog(FATAL, "cannot select from table %s.%s: error code %d",
+ table->schema, table->name, ret);
+
+ if (SPI_processed > 0)
+ {
+ bool isnull;
+ int32 val;
+
+ val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull));
+ if (!isnull)
+ elog(LOG, "%s: count in %s.%s is now %d",
+ MyBgworkerEntry->bgw_name,
+ table->schema, table->name, val);
+ }
+*/
+ /*
+ * And finish our transaction.
+ */
+ /*
+ SPI_finish();
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ pgstat_report_activity(STATE_IDLE, NULL);
+ */
+ }
+
+ elog(LOG, "Stopping httpd");
+
+ proc_exit(0);
+}
+
+void _PG_init(void)
+{
+ BackgroundWorker worker;
+
+ /* get the configuration */
+ DefineCustomIntVariable("httpd.naptime",
+ "Duration between each check (in seconds).",
+ NULL,
+ &worker_spi_naptime,
+ 10,
+ 1,
+ INT_MAX,
+ PGC_SIGHUP,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
+ /* set up common data for all our workers */
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ worker.bgw_restart_time = 1; // BGW_NEVER_RESTART;
+ worker.bgw_main = worker_spi_main;
+ worker.bgw_sighup = worker_spi_sighup;
+ worker.bgw_sigterm = worker_spi_sigterm;
+ worker.bgw_name = "httpd";
+ worker.bgw_main_arg = NULL;
+
+ RegisterBackgroundWorker(&worker);
+}
diff --git a/libebb b/libebb
new file mode 160000
+Subproject 60cd03c717ffed3e3c55f35de713d11232213e2