From d77ebb924c1eeca345bbb3f1eeb2df3058a52a18 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 15 Jul 2015 20:03:18 +0200 Subject: o Renaming binaries. --- CMakeLists.txt | 46 ++++---- main.cpp | 328 --------------------------------------------------- sm-http-server.cpp | 238 +++++++++++++++++++++++++++++++++++++ sm-mqtt-consumer.cpp | 328 +++++++++++++++++++++++++++++++++++++++++++++++++++ sm_web_server.cpp | 238 ------------------------------------- 5 files changed, 589 insertions(+), 589 deletions(-) delete mode 100644 main.cpp create mode 100644 sm-http-server.cpp create mode 100644 sm-mqtt-consumer.cpp delete mode 100644 sm_web_server.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b9d9691..11ede97 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.2) -project(mqtt_cassandra_bridge) +project(soil-moisture) find_package(Boost COMPONENTS system program_options unit_test_framework REQUIRED) @@ -44,37 +44,37 @@ ExternalProject_Add(nghttp2 # OpenSSL # TODO: proper discovery -# mqtt_cassandra_bridge -add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h) -add_dependencies(mqtt_cassandra_bridge cpp-driver) +# sm-mqtt-consumer +add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h) +add_dependencies(sm-mqtt-consumer cpp-driver) ## Boost -target_link_libraries(mqtt_cassandra_bridge PRIVATE ${Boost_LIBRARIES}) +target_link_libraries(sm-mqtt-consumer PRIVATE ${Boost_LIBRARIES}) ## Cassandra -target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include) -target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +target_include_directories(sm-mqtt-consumer PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(sm-mqtt-consumer PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) ## Mosquitto -target_compile_options(mqtt_cassandra_bridge PUBLIC ${SHARED_COMPILE_OPTIONS}) -target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp) +target_compile_options(sm-mqtt-consumer PUBLIC ${SHARED_COMPILE_OPTIONS}) +target_link_libraries(sm-mqtt-consumer PRIVATE mosquitto mosquittopp) ## Ble toys -add_dependencies(mqtt_cassandra_bridge ble-toys) -target_include_directories(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/include) -target_link_libraries(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) +add_dependencies(sm-mqtt-consumer ble-toys) +target_include_directories(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/include) +target_link_libraries(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) -# sm_web_server -add_executable(sm_web_server sm_web_server.cpp cassandra_support.h http_support.h) -target_compile_options(sm_web_server PUBLIC ${SHARED_COMPILE_OPTIONS}) +# sm-http-server +add_executable(sm-http-server sm-http-server.cpp cassandra_support.h http_support.h) +target_compile_options(sm-http-server PUBLIC ${SHARED_COMPILE_OPTIONS}) ## Boost -target_link_libraries(sm_web_server PRIVATE ${Boost_LIBRARIES}) +target_link_libraries(sm-http-server PRIVATE ${Boost_LIBRARIES}) ## Cassandra -add_dependencies(sm_web_server cpp-driver) -target_include_directories(sm_web_server PRIVATE ${CPP_DRIVER}/include) -target_link_libraries(sm_web_server PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +add_dependencies(sm-http-server cpp-driver) +target_include_directories(sm-http-server PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(sm-http-server PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) ## Nghttp2 -add_dependencies(sm_web_server nghttp2) -target_include_directories(sm_web_server PRIVATE ${NGHTTP2}/include) -target_link_libraries(sm_web_server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a) +add_dependencies(sm-http-server nghttp2) +target_include_directories(sm-http-server PRIVATE ${NGHTTP2}/include) +target_link_libraries(sm-http-server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a) ## Misc -target_link_libraries(sm_web_server PRIVATE ssl crypto pthread) +target_link_libraries(sm-http-server PRIVATE ssl crypto pthread) enable_testing() add_executable(http-tests http-tests.cpp http_support.h) diff --git a/main.cpp b/main.cpp deleted file mode 100644 index d7dbfdc..0000000 --- a/main.cpp +++ /dev/null @@ -1,328 +0,0 @@ -#include "cassandra_support.h" -#include "mosquittopp.h" -#include "trygvis/sensor/io.h" -#include -#include -#include - -using namespace std; -using namespace std::chrono; -using namespace trygvis::sensor; -using namespace trygvis::sensor::io; -using namespace trygvis::cassandra_support; -using namespace boost; -namespace po = boost::program_options; - -static bool should_run; -static string mqtt_broker_host; -static auto mqtt_broker_port = 1883; -static auto queue_name = "/trygvis"; -static string keyspace_name = "soil_moisture"; - -static unique_ptr current_cassandra_session; - -struct sensor_measurement { - int sensor; - int value; - - sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { - } - - ~sensor_measurement() = default; -}; - -struct device_measurement { - string device; - long timestamp; - vector sensors; - - device_measurement(string &device, long timestamp, vector &&sensors) : - device(device), timestamp(timestamp), sensors(std::move(sensors)) { - }; - - ~device_measurement() = default; - - string str() { - stringstream buf; - buf << "device=" << device; - buf << ", timestamp=" << timestamp; - std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { - buf << ", #" << sensor.sensor << "=" + sensor.value; - }); - return buf.str(); - } -}; - -string &&to_string(CassFuture *f) { - const char *message; - size_t message_length; - cass_future_error_message(f, &message, &message_length); - - return std::move(string(message, message_length)); -} - -class mqtt_lib { -public: - mqtt_lib() { - mosquitto_lib_init(); - } - - ~mqtt_lib() { - mosquitto_lib_cleanup(); - } -}; - -class mqtt_client : private mosqpp::mosquittopp { -public: - mqtt_client(std::function on_message_) : mosquittopp(), - on_message_(on_message_) { - cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; - loop_start(); - connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); - } - - ~mqtt_client() { - loop_stop(true); - disconnect(); - } - -private: - std::function on_message_; - bool subscribed = false; - - void on_connect(int rc) override { - cout << "Connected to MQTT broker, rc=" << rc << endl; -// should_run = false; - int qos = 0; - if (!subscribed) { - subscribed = true; - cout << "Subscribing..." << endl; - subscribe(nullptr, queue_name, qos); - } - } - - void on_disconnect(int rc) override { - subscribed = false; - - cout << "Oops, disconnected, rc=" << rc << endl; - } - - void on_publish(int mid) override { - } - - void on_message(const struct mosquitto_message *message) override { - string payload((const char *) message->payload, (size_t) message->payloadlen); - on_message_(message); - } - - void on_subscribe(int mid, int qos_count, const int *granted_qos) override { - cout << "Subscribed" << endl; - } - - void on_unsubscribe(int mid) override { - cout << "Oops, unsubscribed" << endl; - } - - void on_log(int level, const char *str) override { - cout << "MQTT: " << level << ":" << str << endl; - } - - void on_error() override { - cout << "Oops, error" << endl; - } -}; - -void print_error(CassFuture *future) { - cout << "Cassandra error: " << error_message(future) << endl; -} - -auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { - cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); - - q.bind(0, measurement.device); - - std::time_t t = measurement.timestamp; - char day[100]; - std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); - q.bind(1, day); - - q.bind(2, measurement.timestamp * 1000); - - cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); - for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { - cassandra_tuple tuple(2); - tuple.set(0, sensor.sensor); - tuple.set(1, sensor.value); - sensors.append(std::move(tuple)); - }); - - q.bind(3, sensors); - - return cass_session_execute(session, q.statement); -} - -template -boost::optional map(boost::optional &a, std::function f) { - if (!a.is_initialized()) { - return boost::none; - } - - return make_optional(f(a)); -} - -template -boost::optional flat_map(boost::optional &a, boost::optional (&f)(Source)) { - if (!a.is_initialized()) { - return boost::none; - } - - return f(a.get()); -} - -template -boost::optional l_c(const Source source) { - try { - return boost::lexical_cast(source); - } catch (bad_lexical_cast &e) { - return boost::none; - } -}; - -void on_message(const struct mosquitto_message *message) { - string payload((const char *) message->payload, (size_t) message->payloadlen); - cout << "storing message: " << endl; - cout << payload << endl; - cout << "----------------------------------------" << endl; - - KeyDictionary dict; - auto sample_buffer = make_shared(); - auto input = make_shared(sample_buffer, dict); - - mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); - - input->process(buffer); - input->finish(); - - cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; - - auto device_key = dict.indexOf("device"); - auto timestamp_key = dict.indexOf("timestamp"); - - std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { - cout << "Sample: " << sample.to_string() << endl; - - auto deviceO = sample.at(device_key); - auto timestampS = sample.at(timestamp_key); - - if (!deviceO) { - cout << "Missing required key 'device'" << endl; - } - - if (!timestampS) { - cout << "Missing required key 'timestamp'" << endl; - } - - auto device = deviceO.get(); - - auto timestamp = flat_map(timestampS, l_c); - if (!timestamp) { - cout << "Invalid value for 'timestamp'" << endl; - } - - vector sensors; - - for (int i = 0; i < 10; i++) { - auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); - - auto value = flat_map(valueS, l_c); - - if (!value) { - continue; - } - - sensors.emplace_back(i, value.get()); - } - - if (sensors.size() == 0) { - return; - } - - device_measurement measurement(device, timestamp.get(), std::move(sensors)); - - cout << "Measurement: " << measurement.str() << endl; - - if (current_cassandra_session) { - handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { - cout << "Success!" << endl; - }, [&](auto future, auto err) { - cout << "Failure: " << error_message(future) << endl; - }); - } else { - cout << "Not connected to Cassandra" << endl; - } - }); -} - -int main(int argc, const char **argv) { - mqtt_lib mqtt_lib(); - - string cassandra_cluster; - po::options_description all("Options"); - all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); - all.add_options()("mqtt-broker-host", po::value(&mqtt_broker_host)->default_value("trygvis.io")); - - po::variables_map vm; - try { - auto parsed = po::parse_command_line(argc, argv, all); - po::store(parsed, vm); - po::notify(vm); - auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); - - if (vm.count("help")) { - cerr << all << "\n"; - return EXIT_FAILURE; - } - - if (unrecognized.size()) { - cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; - return EXIT_FAILURE; - } - - } catch (po::required_option &e) { - cerr << "Missing required option: " << e.get_option_name() << endl; - cerr << all << endl; - } catch (po::unknown_option &e) { - cerr << e.what() << endl; - return EXIT_FAILURE; - } - - mqtt_client mqtt_client(on_message); - CassFuture *connect_future = nullptr; - CassCluster *cluster = cass_cluster_new(); - auto session = make_unique(); - - cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - - connect_future = cass_session_connect(session->session, cluster); - - if (cass_future_error_code(connect_future) != CASS_OK) { - string s = to_string(connect_future); - cerr << "Could not connect to Cassandra:" << s << endl; - return EXIT_FAILURE; - } - - cout << "Connected to Cassandra" << endl; - current_cassandra_session = std::move(session); - - execute_query(current_cassandra_session->session, "USE " + keyspace_name); - - should_run = true; - while (should_run) { - cout << "sleeping.." << endl; - std::this_thread::sleep_for(60s); - } - - current_cassandra_session.release(); - - return 0; -} diff --git a/sm-http-server.cpp b/sm-http-server.cpp new file mode 100644 index 0000000..7ac4e67 --- /dev/null +++ b/sm-http-server.cpp @@ -0,0 +1,238 @@ +#include "cassandra_support.h" +#include "http_support.h" +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace nghttp2::asio_http2; +using namespace nghttp2::asio_http2::server; +using namespace trygvis::cassandra_support; +using namespace trygvis::http_support; +namespace po = boost::program_options; + +static unique_ptr current_cassandra_session; +static string keyspace_name = "soil_moisture"; + +const auto text_plain = header_value{"text/plain"}; +const auto application_json = header_value{"application/json"}; + +string read_string(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + const char *buf; + size_t len; + auto err = cass_value_get_string(value, &buf, &len); + assert_ok("cass_value_get_string", err); + + return string(buf, len); +} + +cass_int64_t read_value_int64(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int64_t data; + auto err = cass_value_get_int64(value, &data); + assert_ok("cass_value_get_int64", err); + + return data; +} + +cass_int32_t read_value_int32(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int32_t data; + auto err = cass_value_get_int32(value, &data); + assert_ok("cass_value_get_int32", err); + + return data; +} + +void handle_device_get(const request &req, const response &res, string device) { + if(!current_cassandra_session) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(503, headers); + res.end("No connection to database."); + return; + } + + cout << "handle_device_get(" << device << ");" << endl; + + cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); + stmt.bind(0, device); + vector days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; + stmt.bind(1, std::move(days)); + + auto f = cass_session_execute(current_cassandra_session->session, stmt.statement); + handle_future(f, [&](auto future) { + header_map headers; + headers.emplace("content-type", application_json); + res.write_head(200, headers); + + const CassResult *result = cass_future_get_result(future); + size_t count = cass_result_row_count(result); + cout << "row count: " << count << endl; + CassIterator *rows = cass_iterator_from_result(result); + + stringstream buf; + buf << "["; + + bool first = true; + while (cass_iterator_next(rows)) { + const CassRow *row = cass_iterator_get_row(rows); + + string d = read_string(row, 0); + auto timestamp = read_value_int64(row, 1); +// auto sensors = read_string(row, 1); + int value = -1; + + if (!first) { + buf << ","; + } else { + first = false; + } + buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}"; + } + + buf << endl << "]" << endl; + + cass_result_free(result); + cass_iterator_free(rows); + + res.end(buf.str() + "\r\n"); + }, [&](auto future, auto err) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Bad shit: " << error_message(future) << "\r\n"; + res.end(buf.str()); + }); +} + +using namespace __cxxabiv1; + +std::string util_demangle(std::string to_demangle) { + int status = 0; + char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); + std::string demangled = buff; + std::free(buff); + return demangled; +} + +void internal_server_error(const response &res, const string &msg) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Internal server error: " << msg << "\r\n"; + auto s = buf.str(); + + cout << s << endl; + res.end(s); +} + +int main(int argc, const char *const argv[]) { + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique(); + + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + connect_future = cass_session_connect(session->session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = error_message(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; + } + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + execute_query(current_cassandra_session->session, "USE " + keyspace_name); + boost::system::error_code ec; + http2 server; + server.num_threads(4); + + server.handle("/", [](const request &req, const response &res) { + cerr << req.method() << " " << req.uri().path << endl; + + vector paths; + auto &path = req.uri().path; + boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on); + + if (paths.begin()->size() == 0) { + paths.erase(paths.begin()); + } + + try { + for (auto &p : paths) { + cout << "path ->" << p << "<-" << endl; + } + + param device("device"); + if (matches(paths, "device", device)) { + if (req.method() == "GET") { + handle_device_get(req, res, device.value); + } else { + method_not_allowed(req, res); + } + } else { + res.write_head(404); + res.end("Not found :(\r\n"); + } + } catch (const exception& ex) { + internal_server_error(res, ex.what()); + } catch (const string& ex) { + internal_server_error(res, ex); + } catch (...) { + auto type = util_demangle(__cxa_current_exception_type()->name()); + internal_server_error(res, "Unknown exception, type: " + type); + } + }); + + std::cerr << "Starting server" << endl; + if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { + std::cerr << "error: " << ec.message() << std::endl; + } + std::cerr << "woot?" << endl; + + return EXIT_SUCCESS; +} diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp new file mode 100644 index 0000000..d7dbfdc --- /dev/null +++ b/sm-mqtt-consumer.cpp @@ -0,0 +1,328 @@ +#include "cassandra_support.h" +#include "mosquittopp.h" +#include "trygvis/sensor/io.h" +#include +#include +#include + +using namespace std; +using namespace std::chrono; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace trygvis::cassandra_support; +using namespace boost; +namespace po = boost::program_options; + +static bool should_run; +static string mqtt_broker_host; +static auto mqtt_broker_port = 1883; +static auto queue_name = "/trygvis"; +static string keyspace_name = "soil_moisture"; + +static unique_ptr current_cassandra_session; + +struct sensor_measurement { + int sensor; + int value; + + sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { + } + + ~sensor_measurement() = default; +}; + +struct device_measurement { + string device; + long timestamp; + vector sensors; + + device_measurement(string &device, long timestamp, vector &&sensors) : + device(device), timestamp(timestamp), sensors(std::move(sensors)) { + }; + + ~device_measurement() = default; + + string str() { + stringstream buf; + buf << "device=" << device; + buf << ", timestamp=" << timestamp; + std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { + buf << ", #" << sensor.sensor << "=" + sensor.value; + }); + return buf.str(); + } +}; + +string &&to_string(CassFuture *f) { + const char *message; + size_t message_length; + cass_future_error_message(f, &message, &message_length); + + return std::move(string(message, message_length)); +} + +class mqtt_lib { +public: + mqtt_lib() { + mosquitto_lib_init(); + } + + ~mqtt_lib() { + mosquitto_lib_cleanup(); + } +}; + +class mqtt_client : private mosqpp::mosquittopp { +public: + mqtt_client(std::function on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; + loop_start(); + connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); + } + + ~mqtt_client() { + loop_stop(true); + disconnect(); + } + +private: + std::function on_message_; + bool subscribed = false; + + void on_connect(int rc) override { + cout << "Connected to MQTT broker, rc=" << rc << endl; +// should_run = false; + int qos = 0; + if (!subscribed) { + subscribed = true; + cout << "Subscribing..." << endl; + subscribe(nullptr, queue_name, qos); + } + } + + void on_disconnect(int rc) override { + subscribed = false; + + cout << "Oops, disconnected, rc=" << rc << endl; + } + + void on_publish(int mid) override { + } + + void on_message(const struct mosquitto_message *message) override { + string payload((const char *) message->payload, (size_t) message->payloadlen); + on_message_(message); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + cout << "Subscribed" << endl; + } + + void on_unsubscribe(int mid) override { + cout << "Oops, unsubscribed" << endl; + } + + void on_log(int level, const char *str) override { + cout << "MQTT: " << level << ":" << str << endl; + } + + void on_error() override { + cout << "Oops, error" << endl; + } +}; + +void print_error(CassFuture *future) { + cout << "Cassandra error: " << error_message(future) << endl; +} + +auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { + cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); + + q.bind(0, measurement.device); + + std::time_t t = measurement.timestamp; + char day[100]; + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(1, day); + + q.bind(2, measurement.timestamp * 1000); + + cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); + for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { + cassandra_tuple tuple(2); + tuple.set(0, sensor.sensor); + tuple.set(1, sensor.value); + sensors.append(std::move(tuple)); + }); + + q.bind(3, sensors); + + return cass_session_execute(session, q.statement); +} + +template +boost::optional map(boost::optional &a, std::function f) { + if (!a.is_initialized()) { + return boost::none; + } + + return make_optional(f(a)); +} + +template +boost::optional flat_map(boost::optional &a, boost::optional (&f)(Source)) { + if (!a.is_initialized()) { + return boost::none; + } + + return f(a.get()); +} + +template +boost::optional l_c(const Source source) { + try { + return boost::lexical_cast(source); + } catch (bad_lexical_cast &e) { + return boost::none; + } +}; + +void on_message(const struct mosquitto_message *message) { + string payload((const char *) message->payload, (size_t) message->payloadlen); + cout << "storing message: " << endl; + cout << payload << endl; + cout << "----------------------------------------" << endl; + + KeyDictionary dict; + auto sample_buffer = make_shared(); + auto input = make_shared(sample_buffer, dict); + + mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); + + input->process(buffer); + input->finish(); + + cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; + + auto device_key = dict.indexOf("device"); + auto timestamp_key = dict.indexOf("timestamp"); + + std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { + cout << "Sample: " << sample.to_string() << endl; + + auto deviceO = sample.at(device_key); + auto timestampS = sample.at(timestamp_key); + + if (!deviceO) { + cout << "Missing required key 'device'" << endl; + } + + if (!timestampS) { + cout << "Missing required key 'timestamp'" << endl; + } + + auto device = deviceO.get(); + + auto timestamp = flat_map(timestampS, l_c); + if (!timestamp) { + cout << "Invalid value for 'timestamp'" << endl; + } + + vector sensors; + + for (int i = 0; i < 10; i++) { + auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); + + auto value = flat_map(valueS, l_c); + + if (!value) { + continue; + } + + sensors.emplace_back(i, value.get()); + } + + if (sensors.size() == 0) { + return; + } + + device_measurement measurement(device, timestamp.get(), std::move(sensors)); + + cout << "Measurement: " << measurement.str() << endl; + + if (current_cassandra_session) { + handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + cout << "Success!" << endl; + }, [&](auto future, auto err) { + cout << "Failure: " << error_message(future) << endl; + }); + } else { + cout << "Not connected to Cassandra" << endl; + } + }); +} + +int main(int argc, const char **argv) { + mqtt_lib mqtt_lib(); + + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); + all.add_options()("mqtt-broker-host", po::value(&mqtt_broker_host)->default_value("trygvis.io")); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + mqtt_client mqtt_client(on_message); + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique(); + + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + connect_future = cass_session_connect(session->session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = to_string(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; + } + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + execute_query(current_cassandra_session->session, "USE " + keyspace_name); + + should_run = true; + while (should_run) { + cout << "sleeping.." << endl; + std::this_thread::sleep_for(60s); + } + + current_cassandra_session.release(); + + return 0; +} diff --git a/sm_web_server.cpp b/sm_web_server.cpp deleted file mode 100644 index 7ac4e67..0000000 --- a/sm_web_server.cpp +++ /dev/null @@ -1,238 +0,0 @@ -#include "cassandra_support.h" -#include "http_support.h" -#include -#include -#include -#include -#include -#include -#include - -using namespace std; -using namespace nghttp2::asio_http2; -using namespace nghttp2::asio_http2::server; -using namespace trygvis::cassandra_support; -using namespace trygvis::http_support; -namespace po = boost::program_options; - -static unique_ptr current_cassandra_session; -static string keyspace_name = "soil_moisture"; - -const auto text_plain = header_value{"text/plain"}; -const auto application_json = header_value{"application/json"}; - -string read_string(const CassRow *row, const size_t index) { - const CassValue *value = cass_row_get_column(row, index); - - const char *buf; - size_t len; - auto err = cass_value_get_string(value, &buf, &len); - assert_ok("cass_value_get_string", err); - - return string(buf, len); -} - -cass_int64_t read_value_int64(const CassRow *row, const size_t index) { - const CassValue *value = cass_row_get_column(row, index); - - cass_int64_t data; - auto err = cass_value_get_int64(value, &data); - assert_ok("cass_value_get_int64", err); - - return data; -} - -cass_int32_t read_value_int32(const CassRow *row, const size_t index) { - const CassValue *value = cass_row_get_column(row, index); - - cass_int32_t data; - auto err = cass_value_get_int32(value, &data); - assert_ok("cass_value_get_int32", err); - - return data; -} - -void handle_device_get(const request &req, const response &res, string device) { - if(!current_cassandra_session) { - header_map headers; - headers.emplace("content-type", text_plain); - res.write_head(503, headers); - res.end("No connection to database."); - return; - } - - cout << "handle_device_get(" << device << ");" << endl; - - cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); - stmt.bind(0, device); - vector days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; - stmt.bind(1, std::move(days)); - - auto f = cass_session_execute(current_cassandra_session->session, stmt.statement); - handle_future(f, [&](auto future) { - header_map headers; - headers.emplace("content-type", application_json); - res.write_head(200, headers); - - const CassResult *result = cass_future_get_result(future); - size_t count = cass_result_row_count(result); - cout << "row count: " << count << endl; - CassIterator *rows = cass_iterator_from_result(result); - - stringstream buf; - buf << "["; - - bool first = true; - while (cass_iterator_next(rows)) { - const CassRow *row = cass_iterator_get_row(rows); - - string d = read_string(row, 0); - auto timestamp = read_value_int64(row, 1); -// auto sensors = read_string(row, 1); - int value = -1; - - if (!first) { - buf << ","; - } else { - first = false; - } - buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}"; - } - - buf << endl << "]" << endl; - - cass_result_free(result); - cass_iterator_free(rows); - - res.end(buf.str() + "\r\n"); - }, [&](auto future, auto err) { - header_map headers; - headers.emplace("content-type", text_plain); - res.write_head(500, headers); - - stringstream buf; - buf << "Bad shit: " << error_message(future) << "\r\n"; - res.end(buf.str()); - }); -} - -using namespace __cxxabiv1; - -std::string util_demangle(std::string to_demangle) { - int status = 0; - char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); - std::string demangled = buff; - std::free(buff); - return demangled; -} - -void internal_server_error(const response &res, const string &msg) { - header_map headers; - headers.emplace("content-type", text_plain); - res.write_head(500, headers); - - stringstream buf; - buf << "Internal server error: " << msg << "\r\n"; - auto s = buf.str(); - - cout << s << endl; - res.end(s); -} - -int main(int argc, const char *const argv[]) { - string cassandra_cluster; - po::options_description all("Options"); - all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); - - po::variables_map vm; - try { - auto parsed = po::parse_command_line(argc, argv, all); - po::store(parsed, vm); - po::notify(vm); - auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); - - if (vm.count("help")) { - cerr << all << "\n"; - return EXIT_FAILURE; - } - - if (unrecognized.size()) { - cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; - return EXIT_FAILURE; - } - - } catch (po::required_option &e) { - cerr << "Missing required option: " << e.get_option_name() << endl; - cerr << all << endl; - } catch (po::unknown_option &e) { - cerr << e.what() << endl; - return EXIT_FAILURE; - } - - CassFuture *connect_future = nullptr; - CassCluster *cluster = cass_cluster_new(); - auto session = make_unique(); - - cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - - connect_future = cass_session_connect(session->session, cluster); - - if (cass_future_error_code(connect_future) != CASS_OK) { - string s = error_message(connect_future); - cerr << "Could not connect to Cassandra:" << s << endl; - return EXIT_FAILURE; - } - - cout << "Connected to Cassandra" << endl; - current_cassandra_session = std::move(session); - - execute_query(current_cassandra_session->session, "USE " + keyspace_name); - boost::system::error_code ec; - http2 server; - server.num_threads(4); - - server.handle("/", [](const request &req, const response &res) { - cerr << req.method() << " " << req.uri().path << endl; - - vector paths; - auto &path = req.uri().path; - boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on); - - if (paths.begin()->size() == 0) { - paths.erase(paths.begin()); - } - - try { - for (auto &p : paths) { - cout << "path ->" << p << "<-" << endl; - } - - param device("device"); - if (matches(paths, "device", device)) { - if (req.method() == "GET") { - handle_device_get(req, res, device.value); - } else { - method_not_allowed(req, res); - } - } else { - res.write_head(404); - res.end("Not found :(\r\n"); - } - } catch (const exception& ex) { - internal_server_error(res, ex.what()); - } catch (const string& ex) { - internal_server_error(res, ex); - } catch (...) { - auto type = util_demangle(__cxa_current_exception_type()->name()); - internal_server_error(res, "Unknown exception, type: " + type); - } - }); - - std::cerr << "Starting server" << endl; - if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { - std::cerr << "error: " << ec.message() << std::endl; - } - std::cerr << "woot?" << endl; - - return EXIT_SUCCESS; -} -- cgit v1.2.3