#include "cassandra_support.h" #include "http_support.h" #include #include using namespace std; using namespace nghttp2::asio_http2; using namespace nghttp2::asio_http2::server; using namespace trygvis::cassandra_support; using namespace trygvis::http_support; namespace po = boost::program_options; static unique_ptr current_cassandra_session; static string keyspace_name = "soil_moisture"; const auto text_plain = header_value{"text/plain"}; const auto application_json = header_value{"application/json"}; string read_string(const CassRow *row, const size_t index) { const CassValue *value = cass_row_get_column(row, index); const char *buf; size_t len; auto err = cass_value_get_string(value, &buf, &len); assert_ok("cass_value_get_string", err); return string(buf, len); } cass_int64_t read_value_int64(const CassRow *row, const size_t index) { const CassValue *value = cass_row_get_column(row, index); cass_int64_t data; auto err = cass_value_get_int64(value, &data); assert_ok("cass_value_get_int64", err); return data; } cass_int32_t read_value_int32(const CassRow *row, const size_t index) { const CassValue *value = cass_row_get_column(row, index); cass_int32_t data; auto err = cass_value_get_int32(value, &data); assert_ok("cass_value_get_int32", err); return data; } void handle_device_get(const request &req, const response &res, string device) { if (!current_cassandra_session) { header_map headers; headers.emplace("content-type", text_plain); res.write_head(503, headers); res.end("No connection to database."); return; } cout << "handle_device_get(" << device << ");" << endl; cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); stmt.bind_string(0, device); vector days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; stmt.bind_list(1, std::move(days)); current_cassandra_session->execute(std::move(stmt), [&](cassandra_future& future) { const cassandra_result result = future.result(); auto x = result.underlying(); if (!future.ok()) { header_map headers; headers.emplace("content-type", text_plain); res.write_head(500, headers); stringstream buf; buf << "Bad shit: " << future.error_message() << "\r\n"; res.end(buf.str()); return; } header_map headers; headers.emplace("content-type", application_json); res.write_head(200, headers); size_t count = cass_result_row_count(x); CassIterator *rows = cass_iterator_from_result(x); stringstream buf; buf << "["; bool first = true; while (cass_iterator_next(rows)) { const CassRow *row = cass_iterator_get_row(rows); string d = read_string(row, 0); auto timestamp = read_value_int64(row, 1); // auto sensors = read_list(row, 1); if (!first) { buf << ","; } else { first = false; } buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}"; } buf << endl << "]" << endl; cass_iterator_free(rows); res.end(buf.str() + "\r\n"); }); } using namespace __cxxabiv1; std::string util_demangle(std::string to_demangle) { int status = 0; char *buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); std::string demangled = buff; std::free(buff); return demangled; } void internal_server_error(const response &res, const string &msg) { header_map headers; headers.emplace("content-type", text_plain); res.write_head(500, headers); stringstream buf; buf << "Internal server error: " << msg << "\r\n"; auto s = buf.str(); cout << s << endl; res.end(s); } void on_logging_from_cassandra(const CassLogMessage *message, void *data) { stringstream buf; buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " << message->file << ":" << message->function << ":" << message->line << ":" << message->message; cout << "CASSANDRA: " << buf.str() << endl; } int main(int argc, const char *const argv[]) { string cassandra_cluster; po::options_description all("Options"); all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); po::variables_map vm; try { auto parsed = po::parse_command_line(argc, argv, all); po::store(parsed, vm); po::notify(vm); auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { cerr << "Unrecognized option: " << unrecognized.at(0) << endl; cerr << all << endl; return EXIT_FAILURE; } } catch (po::required_option &e) { cerr << "Missing required option: " << e.get_option_name() << endl; cerr << all << endl; return EXIT_FAILURE; } catch (po::unknown_option &e) { cerr << e.what() << endl; return EXIT_FAILURE; } cass_log_set_level(CASS_LOG_INFO); cass_log_set_callback(on_logging_from_cassandra, nullptr); auto cluster = cass_cluster_new(); cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); cass_cluster_set_num_threads_io(cluster, 1); current_cassandra_session = make_unique(); auto connect_future = cass_session_connect(current_cassandra_session->underlying(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra: " << s << endl; return EXIT_FAILURE; } cout << "Connected to Cassandra" << endl; execute_query(current_cassandra_session->underlying(), "USE " + keyspace_name); boost::system::error_code ec; http2 server; server.num_threads(4); server.handle("/", [](const request &req, const response &res) { cout << req.method() << " " << req.uri().path << endl; vector paths; auto &path = req.uri().path; boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on); if (paths.begin()->size() == 0) { paths.erase(paths.begin()); } try { for (auto &p : paths) { cout << "path ->" << p << "<-" << endl; } param device("device"); if (matches(paths, "device", device)) { if (req.method() == "GET") { handle_device_get(req, res, device.value); } else { method_not_allowed(req, res); } } else { res.write_head(404); res.end("Not found :(\r\n"); } } catch (const exception &ex) { internal_server_error(res, ex.what()); } catch (const string &ex) { internal_server_error(res, ex); } catch (...) { auto type = util_demangle(__cxa_current_exception_type()->name()); internal_server_error(res, "Unknown exception, type: " + type); } }); cout << "Starting HTTP listener" << endl; if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { cout << "error: " << ec.message() << endl; } cout << "woot?" << endl; return EXIT_SUCCESS; }