aboutsummaryrefslogtreecommitdiff
path: root/sm-http-server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sm-http-server.cpp')
-rw-r--r--sm-http-server.cpp238
1 files changed, 238 insertions, 0 deletions
diff --git a/sm-http-server.cpp b/sm-http-server.cpp
new file mode 100644
index 0000000..7ac4e67
--- /dev/null
+++ b/sm-http-server.cpp
@@ -0,0 +1,238 @@
+#include "cassandra_support.h"
+#include "http_support.h"
+#include <nghttp2/asio_http2_server.h>
+#include <iostream>
+#include <string>
+#include <boost/program_options.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <cxxabi.h>
+
+using namespace std;
+using namespace nghttp2::asio_http2;
+using namespace nghttp2::asio_http2::server;
+using namespace trygvis::cassandra_support;
+using namespace trygvis::http_support;
+namespace po = boost::program_options;
+
+static unique_ptr<cassandra_session> current_cassandra_session;
+static string keyspace_name = "soil_moisture";
+
+const auto text_plain = header_value{"text/plain"};
+const auto application_json = header_value{"application/json"};
+
+string read_string(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ const char *buf;
+ size_t len;
+ auto err = cass_value_get_string(value, &buf, &len);
+ assert_ok("cass_value_get_string", err);
+
+ return string(buf, len);
+}
+
+cass_int64_t read_value_int64(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ cass_int64_t data;
+ auto err = cass_value_get_int64(value, &data);
+ assert_ok("cass_value_get_int64", err);
+
+ return data;
+}
+
+cass_int32_t read_value_int32(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ cass_int32_t data;
+ auto err = cass_value_get_int32(value, &data);
+ assert_ok("cass_value_get_int32", err);
+
+ return data;
+}
+
+void handle_device_get(const request &req, const response &res, string device) {
+ if(!current_cassandra_session) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(503, headers);
+ res.end("No connection to database.");
+ return;
+ }
+
+ cout << "handle_device_get(" << device << ");" << endl;
+
+ cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2);
+ stmt.bind(0, device);
+ vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"};
+ stmt.bind(1, std::move(days));
+
+ auto f = cass_session_execute(current_cassandra_session->session, stmt.statement);
+ handle_future(f, [&](auto future) {
+ header_map headers;
+ headers.emplace("content-type", application_json);
+ res.write_head(200, headers);
+
+ const CassResult *result = cass_future_get_result(future);
+ size_t count = cass_result_row_count(result);
+ cout << "row count: " << count << endl;
+ CassIterator *rows = cass_iterator_from_result(result);
+
+ stringstream buf;
+ buf << "[";
+
+ bool first = true;
+ while (cass_iterator_next(rows)) {
+ const CassRow *row = cass_iterator_get_row(rows);
+
+ string d = read_string(row, 0);
+ auto timestamp = read_value_int64(row, 1);
+// auto sensors = read_string(row, 1);
+ int value = -1;
+
+ if (!first) {
+ buf << ",";
+ } else {
+ first = false;
+ }
+ buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}";
+ }
+
+ buf << endl << "]" << endl;
+
+ cass_result_free(result);
+ cass_iterator_free(rows);
+
+ res.end(buf.str() + "\r\n");
+ }, [&](auto future, auto err) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(500, headers);
+
+ stringstream buf;
+ buf << "Bad shit: " << error_message(future) << "\r\n";
+ res.end(buf.str());
+ });
+}
+
+using namespace __cxxabiv1;
+
+std::string util_demangle(std::string to_demangle) {
+ int status = 0;
+ char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status);
+ std::string demangled = buff;
+ std::free(buff);
+ return demangled;
+}
+
+void internal_server_error(const response &res, const string &msg) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(500, headers);
+
+ stringstream buf;
+ buf << "Internal server error: " << msg << "\r\n";
+ auto s = buf.str();
+
+ cout << s << endl;
+ res.end(s);
+}
+
+int main(int argc, const char *const argv[]) {
+ string cassandra_cluster;
+ po::options_description all("Options");
+ all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
+
+ po::variables_map vm;
+ try {
+ auto parsed = po::parse_command_line(argc, argv, all);
+ po::store(parsed, vm);
+ po::notify(vm);
+ auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
+
+ if (vm.count("help")) {
+ cerr << all << "\n";
+ return EXIT_FAILURE;
+ }
+
+ if (unrecognized.size()) {
+ cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ return EXIT_FAILURE;
+ }
+
+ } catch (po::required_option &e) {
+ cerr << "Missing required option: " << e.get_option_name() << endl;
+ cerr << all << endl;
+ } catch (po::unknown_option &e) {
+ cerr << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+
+ CassFuture *connect_future = nullptr;
+ CassCluster *cluster = cass_cluster_new();
+ auto session = make_unique<cassandra_session>();
+
+ cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+
+ connect_future = cass_session_connect(session->session, cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = error_message(connect_future);
+ cerr << "Could not connect to Cassandra:" << s << endl;
+ return EXIT_FAILURE;
+ }
+
+ cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
+
+ execute_query(current_cassandra_session->session, "USE " + keyspace_name);
+ boost::system::error_code ec;
+ http2 server;
+ server.num_threads(4);
+
+ server.handle("/", [](const request &req, const response &res) {
+ cerr << req.method() << " " << req.uri().path << endl;
+
+ vector<string> paths;
+ auto &path = req.uri().path;
+ boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on);
+
+ if (paths.begin()->size() == 0) {
+ paths.erase(paths.begin());
+ }
+
+ try {
+ for (auto &p : paths) {
+ cout << "path ->" << p << "<-" << endl;
+ }
+
+ param device("device");
+ if (matches(paths, "device", device)) {
+ if (req.method() == "GET") {
+ handle_device_get(req, res, device.value);
+ } else {
+ method_not_allowed(req, res);
+ }
+ } else {
+ res.write_head(404);
+ res.end("Not found :(\r\n");
+ }
+ } catch (const exception& ex) {
+ internal_server_error(res, ex.what());
+ } catch (const string& ex) {
+ internal_server_error(res, ex);
+ } catch (...) {
+ auto type = util_demangle(__cxa_current_exception_type()->name());
+ internal_server_error(res, "Unknown exception, type: " + type);
+ }
+ });
+
+ std::cerr << "Starting server" << endl;
+ if (server.listen_and_serve(ec, "127.0.0.1", "3000")) {
+ std::cerr << "error: " << ec.message() << std::endl;
+ }
+ std::cerr << "woot?" << endl;
+
+ return EXIT_SUCCESS;
+}