#include #include #include #include #include /////////////////////////////////////// #include "ebb.h" struct ev_loop *loop = NULL; // TODO: register a handler for new_buf that uses palloc // TODO: handle errors, figure out how to use ebb_request_parser_has_error /////////////////////////////////////// #include "postgres.h" #include "utils/portal.h" /* These are always necessary for a bgworker */ #include "miscadmin.h" #include "postmaster/bgworker.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/shmem.h" /* these headers are used by this particular worker's code */ #include "access/xact.h" #include "executor/spi.h" #include "fmgr.h" #include "lib/stringinfo.h" #include "pgstat.h" #include "utils/builtins.h" #include "utils/snapmgr.h" #include "tcop/utility.h" PG_MODULE_MAGIC; void _PG_init(void); /* flags set by signal handlers */ static volatile sig_atomic_t got_sighup = false; static volatile sig_atomic_t got_sigterm = false; /* GUC variables */ static int worker_spi_naptime = 10; /* If the plan should be read-only */ bool read_only = false; struct { char *path; char *uri; char *query_string; char *query; char *field, *value; bool do_close; } http_state; struct { SPIPlanPtr plan; Portal portal; bool is_cursor; } sql_state; static int start_query() { int ret; bzero(&sql_state, sizeof(sql_state)); elog(LOG, "start_query: %s", http_state.query); /* Connect to the database */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); ret = SPI_connect(); if(ret != SPI_OK_CONNECT) { elog(FATAL, "Could not connect to server: %s", SPI_result_code_string(ret)); return 1; } PushActiveSnapshot(GetTransactionSnapshot()); pgstat_report_activity(STATE_RUNNING, http_state.query); elog(LOG, "SPI_prepare_cursor"); sql_state.plan = SPI_prepare_cursor(http_state.query, 0, /* No arguments */ NULL, /* Empty array of arguments */ CURSOR_OPT_NO_SCROLL); if(SPI_result != 0) { elog(LOG, "SPI_prepare_cursor: SPI_result=%s", SPI_result_code_string(SPI_result)); return 1; } sql_state.is_cursor = SPI_is_cursor_plan(sql_state.plan); if(sql_state.is_cursor) { sql_state.portal = SPI_cursor_open(NULL, sql_state.plan, NULL, /* No argument values */ NULL, /* No arguments that are NULL */ read_only); /* This tx is read only */ if(SPI_result != 0) { elog(LOG, "SPI_cursor_open: SPI_result=%s", SPI_result_code_string(SPI_result)); return 1; } elog(LOG, "SPI_cursor_open: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable); } else { ret = SPI_execute_plan(sql_state.plan, NULL, NULL, read_only, 0); if(ret < 0) { elog(LOG, "SPI_execute_plan: SPI_result=%s", SPI_result_code_string(SPI_result)); return 1; } elog(LOG, "SPI_execute_plan: SPI_processed=%d, SPI_tuptable=%p", SPI_processed, SPI_tuptable); } return 0; } static void complete_query() { elog(LOG, "complete_query"); if(sql_state.plan != NULL) { SPI_freeplan(sql_state.plan); } SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); pgstat_report_activity(STATE_IDLE, NULL); } /*** * Web server handler */ /* #define MSG ("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nhello world\n") static int c = 0; struct hello_connection { unsigned int responses_to_write; }; */ static void on_close(ebb_connection *connection) { elog(LOG, "on_close"); free(connection->data); free(connection); } static void continue_responding(ebb_connection *connection) { int i, j; StringInfoData buf; TupleDesc tupleDesc; if(http_state.do_close) { goto complete; } initStringInfo(&buf); if(sql_state.is_cursor) { SPI_cursor_fetch(sql_state.portal, true /* forward */, 3 /* count */); // elog(LOG, "SPI_cursor_fetch: SPI_processed = %d, SPI_tuptable = %p", SPI_processed, SPI_tuptable); if(SPI_result != 0) { elog(FATAL, "SPI_cursor_fetch: SPI_result: %s", SPI_result_code_string(SPI_result)); goto complete; } } if(SPI_processed == 0 || SPI_tuptable == NULL) { goto complete; } tupleDesc = SPI_tuptable->tupdesc; for(i = 0; i < SPI_processed; i++) { for (j = 0; j < tupleDesc->natts; j++) { char *value; if (tupleDesc->attrs[j]->attisdropped) { continue; } if(j > 0) { appendStringInfoChar(&buf, ';'); } value = SPI_getvalue(SPI_tuptable->vals[i], tupleDesc, j + 1); switch(SPI_result) { case 0: if(value != NULL) { appendStringInfoString(&buf, value); } break; case SPI_ERROR_NOOUTFUNC: // no function to handle the conversion to text break; default: elog(LOG, "SPI_result=%s", SPI_result_code_string(SPI_result)); complete_query(); ebb_connection_schedule_close(connection); } if(value != NULL) { pfree(value); } } appendStringInfoChar(&buf, '\r'); appendStringInfoChar(&buf, '\n'); } ebb_connection_write(connection, buf.data, buf.len, continue_responding); return; complete: complete_query(); ebb_connection_schedule_close(connection); } static void request_complete(ebb_request *request) { ebb_connection *connection; int i; StringInfoData buf; TupleDesc tupleDesc; //printf("request complete \n"); elog(LOG, "request complete"); connection = request->data; //connection_data = connection->data; /* if(ebb_request_should_keep_alive(request)) connection_data->responses_to_write++; else connection_data->responses_to_write = 1; */ initStringInfo(&buf); if(start_query()) { appendStringInfo(&buf, "HTTP/1.1 500 Internal Server Error: %s\r\n", SPI_result_code_string(SPI_result)); appendStringInfoString(&buf, "Content-Length: 0\r\n"); appendStringInfoString(&buf, "\r\n"); http_state.do_close = true; ebb_connection_write(connection, buf.data, buf.len, continue_responding); goto done; } appendStringInfoString(&buf, "HTTP/1.1 200 OK\r\n"); if(sql_state.is_cursor || SPI_tuptable) { tupleDesc = sql_state.is_cursor ? sql_state.portal->tupDesc : SPI_tuptable->tupdesc; appendStringInfoString(&buf, "Content-Type: text/csv\r\n"); appendStringInfoString(&buf, "\r\n"); for (i = 0; i < tupleDesc->natts; i++) { if (tupleDesc->attrs[i]->attisdropped) continue; if (i > 0) { appendStringInfoChar(&buf, ';'); } appendStringInfoString(&buf, NameStr(tupleDesc->attrs[i]->attname)); appendStringInfoString(&buf, "\r\n"); } } else { appendStringInfoString(&buf, "Content-Length: 0\r\n"); appendStringInfoString(&buf, "\r\n"); http_state.do_close = true; } ebb_connection_write(connection, buf.data, buf.len, continue_responding); done: free(request); } int next_is_query = 0; static void on_header_field(ebb_request* req, const char *at, size_t length, int header_index) { // elog(LOG, "on_header_field: at=%s, length=%zu, header_index=%d", at, length, header_index); http_state.field = palloc(length + 1); strncpy(http_state.field, at, length); http_state.field[length] = '\0'; if(strncasecmp("query", http_state.field, length) == 0) { next_is_query = 1; } } static void on_header_value(ebb_request* req, const char *at, size_t length, int header_index) { // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); http_state.value = palloc(length + 1); strncpy(http_state.value, at, length); http_state.value[length] = '\0'; if(next_is_query == 1) { http_state.query = http_state.value; } elog(LOG, "Header: %s: %s", http_state.field, http_state.value); } static void on_path(ebb_request* req, const char *at, size_t length) { // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); http_state.path = palloc(length + 1); strncpy(http_state.path, at, length); http_state.path[length] = '\0'; elog(LOG, "Path: %s", http_state.path); } static void on_uri(ebb_request* req, const char *at, size_t length) { // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); http_state.uri = palloc(length + 1); strncpy(http_state.uri, at, length); http_state.uri[length] = '\0'; elog(LOG, "Uri: %s", http_state.uri); } static void on_query_string(ebb_request* req, const char *at, size_t length) { // elog(LOG, "on_header_value: at=%s, length=%zu, header_index=%d", at, length, header_index); http_state.query_string = palloc(length + 1); strncpy(http_state.query_string, at, length); http_state.query_string[length] = '\0'; elog(LOG, "Query string: %s", http_state.query_string); } static ebb_request* new_request(ebb_connection *connection) { ebb_request *request; bzero(&http_state, sizeof(http_state)); //printf("request %d\n", ++c); elog(LOG, "request"); request = malloc(sizeof(ebb_request)); ebb_request_init(request); request->data = connection; request->on_complete = request_complete; request->on_path = on_path; request->on_uri = on_uri; request->on_query_string = on_query_string; request->on_header_field = on_header_field; request->on_header_value = on_header_value; return request; } static ebb_connection* new_connection(ebb_server *server, struct sockaddr_in *addr) { /* struct hello_connection *connection_data = malloc(sizeof(struct hello_connection)); */ ebb_connection *connection; /* if(connection_data == NULL) return NULL; connection_data->responses_to_write = 0; */ connection = malloc(sizeof(ebb_connection)); if(connection == NULL) { //free(connection_data); return NULL; } ebb_connection_init(connection); // connection->data = connection_data; connection->new_request = new_request; connection->on_close = on_close; //printf("connection: %d\n", c++); elog(LOG, "connection"); return connection; } /*** * Postgresql background worker handlers */ static void worker_spi_sigterm(SIGNAL_ARGS) { int save_errno = errno; got_sigterm = true; elog(LOG, "httpd: got sigterm"); if (MyProc) SetLatch(&MyProc->procLatch); errno = save_errno; if(loop) ev_unloop(loop, EVUNLOOP_ALL); } static void worker_spi_sighup(SIGNAL_ARGS) { got_sighup = true; elog(LOG, "httpd: got sighup"); if (MyProc) SetLatch(&MyProc->procLatch); if(loop) ev_unloop(loop, EVUNLOOP_ALL); } static void timer_callback(EV_P_ ev_timer *w, int revents) { // elog(LOG, "timer_callback"); if(loop) ev_unloop(loop, EVUNLOOP_ALL); } static void worker_spi_main(void *main_arg) { // StringInfoData buf; struct ev_timer timer; ebb_server server; int port = 8080; loop = ev_default_loop(0); ebb_server_init(&server, loop); //ebb_server_set_secure(&server, "examples/ca-cert.pem", "examples/ca-key.pem"); server.new_connection = new_connection; server.data = NULL; elog(LOG, "httpd worker starting, pid = %d", getpid()); ebb_server_listen_on_port(&server, port); elog(LOG, "httpd ready, listening on port %d", port); BackgroundWorkerUnblockSignals(); BackgroundWorkerInitializeConnection("postgres", NULL); // do_query(); ev_timer_init(&timer, timer_callback, 1, 1); ev_timer_start(loop, &timer); while (!got_sigterm) { int rc; // elog(LOG, "ev_loop: pre"); ev_loop(loop, 0); // elog(LOG, "ev_loop: post"); // elog(LOG, "Waiting..."); /* * Background workers mustn't call usleep() or any direct equivalent: * instead, they may wait on their process latch, which sleeps as * necessary, but is awakened if postmaster dies. That way the * background process goes away immediately in an emergency. */ rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 100 /*worker_spi_naptime * 1000L*/); ResetLatch(&MyProc->procLatch); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); // elog(LOG, "Woke up"); /* * In case of a SIGHUP, just reload the configuration. */ if (got_sighup) { got_sighup = false; ProcessConfigFile(PGC_SIGHUP); } /* * Start a transaction on which we can run queries. Note that each * StartTransactionCommand() call should be preceded by a * SetCurrentStatementStartTimestamp() call, which sets both the time * for the statement we're about the run, and also the transaction * start time. Also, each other query sent to SPI should probably be * preceded by SetCurrentStatementStartTimestamp(), so that statement * start time is always up to date. * * The SPI_connect() call lets us run queries through the SPI manager, * and the PushActiveSnapshot() call creates an "active" snapshot * which is necessary for queries to have MVCC data to work on. * * The pgstat_report_activity() call makes our activity visible * through the pgstat views. */ /* SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); pgstat_report_activity(STATE_RUNNING, buf.data); */ /* We can now execute queries via SPI */ /* ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_UPDATE_RETURNING) elog(FATAL, "cannot select from table %s.%s: error code %d", table->schema, table->name, ret); if (SPI_processed > 0) { bool isnull; int32 val; val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull)); if (!isnull) elog(LOG, "%s: count in %s.%s is now %d", MyBgworkerEntry->bgw_name, table->schema, table->name, val); } */ /* * And finish our transaction. */ /* SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); pgstat_report_activity(STATE_IDLE, NULL); */ } elog(LOG, "Stopping httpd"); proc_exit(0); } void _PG_init(void) { BackgroundWorker worker; /* get the configuration */ DefineCustomIntVariable("httpd.naptime", "Duration between each check (in seconds).", NULL, &worker_spi_naptime, 10, 1, INT_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); /* set up common data for all our workers */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = 1; // BGW_NEVER_RESTART; worker.bgw_main = worker_spi_main; worker.bgw_sighup = worker_spi_sighup; worker.bgw_sigterm = worker_spi_sigterm; worker.bgw_name = "httpd"; worker.bgw_main_arg = NULL; RegisterBackgroundWorker(&worker); }