From d77ebb924c1eeca345bbb3f1eeb2df3058a52a18 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 15 Jul 2015 20:03:18 +0200 Subject: o Renaming binaries. --- sm-http-server.cpp | 238 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 sm-http-server.cpp (limited to 'sm-http-server.cpp') diff --git a/sm-http-server.cpp b/sm-http-server.cpp new file mode 100644 index 0000000..7ac4e67 --- /dev/null +++ b/sm-http-server.cpp @@ -0,0 +1,238 @@ +#include "cassandra_support.h" +#include "http_support.h" +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace nghttp2::asio_http2; +using namespace nghttp2::asio_http2::server; +using namespace trygvis::cassandra_support; +using namespace trygvis::http_support; +namespace po = boost::program_options; + +static unique_ptr current_cassandra_session; +static string keyspace_name = "soil_moisture"; + +const auto text_plain = header_value{"text/plain"}; +const auto application_json = header_value{"application/json"}; + +string read_string(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + const char *buf; + size_t len; + auto err = cass_value_get_string(value, &buf, &len); + assert_ok("cass_value_get_string", err); + + return string(buf, len); +} + +cass_int64_t read_value_int64(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int64_t data; + auto err = cass_value_get_int64(value, &data); + assert_ok("cass_value_get_int64", err); + + return data; +} + +cass_int32_t read_value_int32(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int32_t data; + auto err = cass_value_get_int32(value, &data); + assert_ok("cass_value_get_int32", err); + + return data; +} + +void handle_device_get(const request &req, const response &res, string device) { + if(!current_cassandra_session) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(503, headers); + res.end("No connection to database."); + return; + } + + cout << "handle_device_get(" << device << ");" << endl; + + cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); + stmt.bind(0, device); + vector days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; + stmt.bind(1, std::move(days)); + + auto f = cass_session_execute(current_cassandra_session->session, stmt.statement); + handle_future(f, [&](auto future) { + header_map headers; + headers.emplace("content-type", application_json); + res.write_head(200, headers); + + const CassResult *result = cass_future_get_result(future); + size_t count = cass_result_row_count(result); + cout << "row count: " << count << endl; + CassIterator *rows = cass_iterator_from_result(result); + + stringstream buf; + buf << "["; + + bool first = true; + while (cass_iterator_next(rows)) { + const CassRow *row = cass_iterator_get_row(rows); + + string d = read_string(row, 0); + auto timestamp = read_value_int64(row, 1); +// auto sensors = read_string(row, 1); + int value = -1; + + if (!first) { + buf << ","; + } else { + first = false; + } + buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}"; + } + + buf << endl << "]" << endl; + + cass_result_free(result); + cass_iterator_free(rows); + + res.end(buf.str() + "\r\n"); + }, [&](auto future, auto err) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Bad shit: " << error_message(future) << "\r\n"; + res.end(buf.str()); + }); +} + +using namespace __cxxabiv1; + +std::string util_demangle(std::string to_demangle) { + int status = 0; + char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); + std::string demangled = buff; + std::free(buff); + return demangled; +} + +void internal_server_error(const response &res, const string &msg) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Internal server error: " << msg << "\r\n"; + auto s = buf.str(); + + cout << s << endl; + res.end(s); +} + +int main(int argc, const char *const argv[]) { + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique(); + + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + connect_future = cass_session_connect(session->session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = error_message(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; + } + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + execute_query(current_cassandra_session->session, "USE " + keyspace_name); + boost::system::error_code ec; + http2 server; + server.num_threads(4); + + server.handle("/", [](const request &req, const response &res) { + cerr << req.method() << " " << req.uri().path << endl; + + vector paths; + auto &path = req.uri().path; + boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on); + + if (paths.begin()->size() == 0) { + paths.erase(paths.begin()); + } + + try { + for (auto &p : paths) { + cout << "path ->" << p << "<-" << endl; + } + + param device("device"); + if (matches(paths, "device", device)) { + if (req.method() == "GET") { + handle_device_get(req, res, device.value); + } else { + method_not_allowed(req, res); + } + } else { + res.write_head(404); + res.end("Not found :(\r\n"); + } + } catch (const exception& ex) { + internal_server_error(res, ex.what()); + } catch (const string& ex) { + internal_server_error(res, ex); + } catch (...) { + auto type = util_demangle(__cxa_current_exception_type()->name()); + internal_server_error(res, "Unknown exception, type: " + type); + } + }); + + std::cerr << "Starting server" << endl; + if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { + std::cerr << "error: " << ec.message() << std::endl; + } + std::cerr << "woot?" << endl; + + return EXIT_SUCCESS; +} -- cgit v1.2.3