diff options
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | .gitmodules | 5 | ||||
-rw-r--r-- | Makefile | 30 | ||||
-rw-r--r-- | README.md | 15 | ||||
-rw-r--r-- | data.sql | 12 | ||||
-rw-r--r-- | httpd.c | 560 | ||||
m--------- | libebb | 0 |
7 files changed, 625 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f2a9932 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +data/ +*.so +*.o diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..0e61cdf --- /dev/null +++ b/.gitmodules @@ -0,0 +1,5 @@ +[submodule "libebb"] + path = libebb + url = https://github.com/evanphx/libebb.git +# Alternate URL for libebb which has a bug with EV_ERROR: +# https://github.com/evanphx/libebb.git diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3f82144 --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ +POSTGRESQL ?= $(HOME)/opt/postgresql-git + +PG_CPPFLAGS = -Imongoose -Ilibebb +SHLIB_LINK = -lev + +MODULE_big = httpd +OBJS=httpd.o libebb/libebb.a +# mongoose/mongoose.o + +PG_CONFIG = $(POSTGRESQL)/bin/pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) + +httpd.o: httpd.c +libebb/libebb.a: + $(MAKE) -C libebb LIBS=-lev + +# Utility targets + +reinitdb: + rm -rf data + $(MAKE) initdb + +initdb: + $(POSTGRESQL)/bin/initdb -D data -E utf8 + echo "port=5433" >> data/postgresql.conf + echo "shared_preload_libraries='httpd'" >> data/postgresql.conf + +run: + $(POSTGRESQL)/bin/postgres -D data diff --git a/README.md b/README.md new file mode 100644 index 0000000..fa3b02b --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +This is me mainly scratching an itch, but it is a small proof of +concept on how possible it is to embed a HTTP server into PostgreSQL +as a background worker. + +Right now it uses libebb as an event-driven HTTP server. As the +background workers only have a single connection to the database, one +process can only serve one client at a time. Libebb support multiple +connection, you better not connect multiple times. + +Other possible HTTP implementations: + + * https://github.com/valenok/mongoose.git - tried it, got complicated + because it wanted to control forking. + * https://github.com/joyent/http-parser - a plain HTTP parser, might + be useful if I end up with a custom HTTP server. diff --git a/data.sql b/data.sql new file mode 100644 index 0000000..2c68069 --- /dev/null +++ b/data.sql @@ -0,0 +1,12 @@ +BEGIN; +DROP TABLE IF EXISTS mytable; +CREATE TABLE mytable ( + id serial, + text varchar(100) +); + +INSERT INTO mytable(text) +SELECT md5(random()::text) FROM generate_series(1, 10) +-- returning * +; +COMMIT; @@ -0,0 +1,560 @@ +#include <string.h> +#include <stdlib.h> +#include <sys/types.h> +#include <unistd.h> +#include <assert.h> + +/////////////////////////////////////// + +#include "ebb.h" +struct ev_loop *loop = NULL; + +// TODO: register a handler for new_buf that uses palloc +// TODO: handle errors, figure out how to use ebb_request_parser_has_error + +/////////////////////////////////////// + +#include "postgres.h" + +#include "utils/portal.h" + +/* These are always necessary for a bgworker */ +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +/* these headers are used by this particular worker's code */ +#include "access/xact.h" +#include "executor/spi.h" +#include "fmgr.h" +#include "lib/stringinfo.h" +#include "pgstat.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" +#include "tcop/utility.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* flags set by signal handlers */ +static volatile sig_atomic_t got_sighup = false; +static volatile sig_atomic_t got_sigterm = false; + +/* GUC variables */ +static int worker_spi_naptime = 10; + +/* If the plan should be read-only */ +bool read_only = false; + +struct { + char *path; + char *uri; + char *query_string; + char *query; + char *field, *value; + bool do_close; +} http_state; + +struct { + SPIPlanPtr plan; + Portal portal; + bool is_cursor; +} sql_state; + +static int start_query() { + int ret; + + bzero(&sql_state, sizeof(sql_state)); + elog(LOG, "start_query: %s", http_state.query); + + /* Connect to the database */ + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + ret = SPI_connect(); + if(ret != SPI_OK_CONNECT) { + elog(FATAL, "Could not connect to server: %s", SPI_result_code_string(ret)); + return 1; + } + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, http_state.query); + + elog(LOG, "SPI_prepare_cursor"); + sql_state.plan = SPI_prepare_cursor(http_state.query, + 0, /* No arguments */ + NULL, /* Empty array of arguments */ + CURSOR_OPT_NO_SCROLL); + if(SPI_result != 0) { + elog(LOG, "SPI_prepare_cursor: SPI_result=%s", SPI_result_code_string(SPI_result)); + return 1; + } + + sql_state.is_cursor = SPI_is_cursor_plan(sql_state.plan); + if(sql_state.is_cursor) { + sql_state.portal = SPI_cursor_open(NULL, + sql_state.plan, + NULL, /* No argument values */ + NULL, /* No arguments that are NULL */ + read_only); /* This tx is read only */ + if(SPI_result != 0) { + elog(LOG, "SPI_cursor_open: SPI_result=%s", SPI_result_code_string(SPI_result)); + return 1; + } + elog(LOG, "SPI_cursor_open: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable); + } else { + ret = SPI_execute_plan(sql_state.plan, NULL, NULL, read_only, 0); + if(ret < 0) { + elog(LOG, "SPI_execute_plan: SPI_result=%s", SPI_result_code_string(SPI_result)); + return 1; + } + elog(LOG, "SPI_execute_plan: SPI_processed=%d", SPI_processed); + } + + return 0; +} + +static void complete_query() { + elog(LOG, "complete_query"); + if(sql_state.plan != NULL) { + SPI_freeplan(sql_state.plan); + } + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); +} + +/*** + * Web server handler + */ + +/* +#define MSG ("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nhello world\n") +static int c = 0; + +struct hello_connection { + unsigned int responses_to_write; +}; +*/ + +static void on_close(ebb_connection *connection) +{ + elog(LOG, "on_close"); + free(connection->data); + free(connection); +} + +static void continue_responding(ebb_connection *connection) +{ + int i, j; + StringInfoData buf; + TupleDesc tupleDesc; + + if(http_state.do_close) { + goto complete; + } + + initStringInfo(&buf); + + if(sql_state.is_cursor) { + SPI_cursor_fetch(sql_state.portal, true /* forward */, 3 /* count */); + elog(LOG, "SPI_cursor_fetch: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable); + if(SPI_result != 0) { + elog(FATAL, "SPI_cursor_fetch: SPI_result: %s", SPI_result_code_string(SPI_result)); + goto complete; + } + } + + elog(LOG, "SPI_cursor_fetch: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable); + if(SPI_processed == 0 || SPI_tuptable == NULL) { + goto complete; + } + + tupleDesc = SPI_tuptable->tupdesc; + + for(i = 0; i < SPI_processed; i++) { + for (j = 0; j < tupleDesc->natts; j++) { + char *value; + if (tupleDesc->attrs[j]->attisdropped) { + continue; + } + + if(j > 0) { + appendStringInfoChar(&buf, ';'); + } + value = SPI_getvalue(SPI_tuptable->vals[i], tupleDesc, j + 1); + switch(SPI_result) { + case 0: + if(value != NULL) { + appendStringInfoString(&buf, value); + } + break; + case SPI_ERROR_NOOUTFUNC: + // no function to handle the conversion to text + break; + default: + elog(LOG, "SPI_result=%s", SPI_result_code_string(SPI_result)); + complete_query(); + ebb_connection_schedule_close(connection); + } + if(value != NULL) { + pfree(value); + } + } + appendStringInfoChar(&buf, '\r'); + appendStringInfoChar(&buf, '\n'); + } + + ebb_connection_write(connection, buf.data, buf.len, continue_responding); + return; + +complete: + complete_query(); + ebb_connection_schedule_close(connection); +} + +static void request_complete(ebb_request *request) +{ + ebb_connection *connection; + int i; + + //printf("request complete \n"); + elog(LOG, "request complete"); + connection = request->data; + //connection_data = connection->data; + + /* + if(ebb_request_should_keep_alive(request)) + connection_data->responses_to_write++; + else + connection_data->responses_to_write = 1; + */ + + if(start_query()) { + StringInfoData buf; + + initStringInfo(&buf); + appendStringInfo(&buf, "500 Internal Server Error: %s", SPI_result_code_string(SPI_result)); + appendStringInfoString(&buf, "Content-Length: 0"); + http_state.do_close = true; + ebb_connection_write(connection, buf.data, buf.len, continue_responding); + } + else { + StringInfoData buf; + TupleDesc tupleDesc; + if(sql_state.is_cursor) { + tupleDesc = sql_state.portal->tupDesc; + } + else { + tupleDesc = SPI_tuptable->tupdesc; + } + + initStringInfo(&buf); + for (i = 0; i < tupleDesc->natts; i++) { + if (tupleDesc->attrs[i]->attisdropped) + continue; + + if (i > 0) { + appendStringInfoChar(&buf, ';'); + } + appendStringInfoString(&buf, NameStr(tupleDesc->attrs[i]->attname)); + } + appendStringInfoChar(&buf, '\r'); + appendStringInfoChar(&buf, '\n'); + ebb_connection_write(connection, buf.data, buf.len, continue_responding); + } + free(request); +} + +int next_is_query = 0; + +static void on_header_field(ebb_request* req, const char *at, size_t length, int header_index) { + // elog(LOG, "on_header_field: at=%s, length=%zu, header_index=%d", at, length, header_index); + http_state.field = palloc(length + 1); + strncpy(http_state.field, at, length); + http_state.field[length] = '\0'; + if(strncasecmp("query", http_state.field, length) == 0) { + next_is_query = 1; + } +} + +static void on_header_value(ebb_request* req, const char *at, size_t length, int header_index) { + // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); + http_state.value = palloc(length + 1); + strncpy(http_state.value, at, length); + http_state.value[length] = '\0'; + + if(next_is_query == 1) { + http_state.query = http_state.value; + } + elog(LOG, "Header: %s: %s", http_state.field, http_state.value); +} + +static void on_path(ebb_request* req, const char *at, size_t length) { + // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); + http_state.path = palloc(length + 1); + strncpy(http_state.path, at, length); + http_state.path[length] = '\0'; + + elog(LOG, "Path: %s", http_state.path); +} + +static void on_uri(ebb_request* req, const char *at, size_t length) { + // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); + http_state.uri = palloc(length + 1); + strncpy(http_state.uri, at, length); + http_state.uri[length] = '\0'; + + elog(LOG, "Uri: %s", http_state.uri); +} + +static void on_query_string(ebb_request* req, const char *at, size_t length) { + // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); + http_state.query_string = palloc(length + 1); + strncpy(http_state.query_string, at, length); + http_state.query_string[length] = '\0'; + + elog(LOG, "Query string: %s", http_state.query_string); +} + +static ebb_request* new_request(ebb_connection *connection) +{ + ebb_request *request; + + bzero(&http_state, sizeof(http_state)); + + //printf("request %d\n", ++c); + elog(LOG, "request"); + request = malloc(sizeof(ebb_request)); + ebb_request_init(request); + request->data = connection; + request->on_complete = request_complete; + request->on_path = on_path; + request->on_uri = on_uri; + request->on_query_string = on_query_string; + request->on_header_field = on_header_field; + request->on_header_value = on_header_value; + return request; +} + +static ebb_connection* new_connection(ebb_server *server, struct sockaddr_in *addr) +{ + /* + struct hello_connection *connection_data = malloc(sizeof(struct hello_connection)); + */ + ebb_connection *connection; + + /* + if(connection_data == NULL) + return NULL; + connection_data->responses_to_write = 0; + */ + + connection = malloc(sizeof(ebb_connection)); + if(connection == NULL) { + //free(connection_data); + return NULL; + } + + ebb_connection_init(connection); + // connection->data = connection_data; + connection->new_request = new_request; + connection->on_close = on_close; + + //printf("connection: %d\n", c++); + elog(LOG, "connection"); + return connection; +} + +/*** + * Postgresql background worker handlers + */ + +static void worker_spi_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + elog(LOG, "httpd: got sigterm"); + if (MyProc) + SetLatch(&MyProc->procLatch); + errno = save_errno; + + if(loop) + ev_unloop(loop, EVUNLOOP_ALL); +} + +static void worker_spi_sighup(SIGNAL_ARGS) +{ + got_sighup = true; + elog(LOG, "httpd: got sighup"); + if (MyProc) + SetLatch(&MyProc->procLatch); + + if(loop) + ev_unloop(loop, EVUNLOOP_ALL); +} + +static void timer_callback(EV_P_ ev_timer *w, int revents) { +// elog(LOG, "timer_callback"); + if(loop) + ev_unloop(loop, EVUNLOOP_ALL); +} + +static void worker_spi_main(void *main_arg) +{ +// StringInfoData buf; + struct ev_timer timer; + ebb_server server; + int port = 8080; + + loop = ev_default_loop(0); + ebb_server_init(&server, loop); + //ebb_server_set_secure(&server, "examples/ca-cert.pem", "examples/ca-key.pem"); + server.new_connection = new_connection; + server.data = NULL; + + elog(LOG, "httpd worker starting, pid = %d", getpid()); + ebb_server_listen_on_port(&server, port); + elog(LOG, "httpd ready, listening on port %d", port); + + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnection("postgres", NULL); + + // do_query(); + + ev_timer_init(&timer, timer_callback, 1, 1); + ev_timer_start(loop, &timer); + + while (!got_sigterm) + { + int rc; + +// elog(LOG, "ev_loop: pre"); + ev_loop(loop, 0); +// elog(LOG, "ev_loop: post"); + +// elog(LOG, "Waiting..."); + /* + * Background workers mustn't call usleep() or any direct equivalent: + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. + */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 100 /*worker_spi_naptime * 1000L*/); + ResetLatch(&MyProc->procLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + +// elog(LOG, "Woke up"); + + /* + * In case of a SIGHUP, just reload the configuration. + */ + if (got_sighup) + { + got_sighup = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Start a transaction on which we can run queries. Note that each + * StartTransactionCommand() call should be preceded by a + * SetCurrentStatementStartTimestamp() call, which sets both the time + * for the statement we're about the run, and also the transaction + * start time. Also, each other query sent to SPI should probably be + * preceded by SetCurrentStatementStartTimestamp(), so that statement + * start time is always up to date. + * + * The SPI_connect() call lets us run queries through the SPI manager, + * and the PushActiveSnapshot() call creates an "active" snapshot + * which is necessary for queries to have MVCC data to work on. + * + * The pgstat_report_activity() call makes our activity visible + * through the pgstat views. + */ + /* + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, buf.data); + */ + + /* We can now execute queries via SPI */ +/* + ret = SPI_execute(buf.data, false, 0); + + if (ret != SPI_OK_UPDATE_RETURNING) + elog(FATAL, "cannot select from table %s.%s: error code %d", + table->schema, table->name, ret); + + if (SPI_processed > 0) + { + bool isnull; + int32 val; + + val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + if (!isnull) + elog(LOG, "%s: count in %s.%s is now %d", + MyBgworkerEntry->bgw_name, + table->schema, table->name, val); + } +*/ + /* + * And finish our transaction. + */ + /* + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); + */ + } + + elog(LOG, "Stopping httpd"); + + proc_exit(0); +} + +void _PG_init(void) +{ + BackgroundWorker worker; + + /* get the configuration */ + DefineCustomIntVariable("httpd.naptime", + "Duration between each check (in seconds).", + NULL, + &worker_spi_naptime, + 10, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + /* set up common data for all our workers */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = 1; // BGW_NEVER_RESTART; + worker.bgw_main = worker_spi_main; + worker.bgw_sighup = worker_spi_sighup; + worker.bgw_sigterm = worker_spi_sigterm; + worker.bgw_name = "httpd"; + worker.bgw_main_arg = NULL; + + RegisterBackgroundWorker(&worker); +} diff --git a/libebb b/libebb new file mode 160000 +Subproject 60cd03c717ffed3e3c55f35de713d11232213e2 |