#include "mosquittopp.h" #include "cassandra.h" #include "trygvis/sensor/io.h" #include #include #include using namespace std; using namespace std::chrono; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace boost; namespace po = boost::program_options; static bool should_run; static string mqtt_broker_host; static auto mqtt_broker_port = 1883; static auto queue_name = "/trygvis"; static string keyspace_name = "soil_moisture"; string &&to_string(CassFuture *f) { const char *message; size_t message_length; cass_future_error_message(f, &message, &message_length); return std::move(string(message, message_length)); } class mqtt_lib { public: mqtt_lib() { mosquitto_lib_init(); } ~mqtt_lib() { mosquitto_lib_cleanup(); } }; class mqtt_client : private mosqpp::mosquittopp { public: mqtt_client(std::function on_message_) : mosquittopp(), on_message_(on_message_) { cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; loop_start(); connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); } ~mqtt_client() { loop_stop(true); disconnect(); } private: std::function on_message_; bool subscribed = false; void on_connect(int rc) override { cout << "Connected to MQTT broker, rc=" << rc << endl; // should_run = false; int qos = 0; if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; subscribe(nullptr, queue_name, qos); } } void on_disconnect(int rc) override { subscribed = false; cout << "Oops, disconnected, rc=" << rc << endl; } void on_publish(int mid) override { } void on_message(const struct mosquitto_message *message) override { string payload((const char *) message->payload, (size_t) message->payloadlen); on_message_(message); } void on_subscribe(int mid, int qos_count, const int *granted_qos) override { cout << "Subscribed" << endl; } void on_unsubscribe(int mid) override { cout << "Oops, unsubscribed" << endl; } void on_log(int level, const char *str) override { cout << "MQTT: " << level << ":" << str << endl; } void on_error() override { cout << "Oops, error" << endl; } }; void print_error(CassFuture *future) { const char *message; size_t message_length; cass_future_error_message(future, &message, &message_length); string msg(message, message_length); cout << "Cassandra error: " << msg << endl; } CassError execute_query(CassSession *session, const string &&query) { CassError rc = CASS_OK; CassFuture *future = NULL; CassStatement *statement = cass_statement_new(query.c_str(), 0); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { print_error(future); } cass_future_free(future); cass_statement_free(statement); return rc; } /* CREATE TABLE sm_by_day ( device text, day text, timestamp timestamp, sensors list>>, PRIMARY KEY ((device, day), timestamp) ) */ CassError insert_into_sm_by_day(CassSession *session, string payload) { CassError rc = CASS_OK; CassStatement *statement = nullptr; CassFuture *future = nullptr; auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);"; statement = cass_statement_new(query, 4); auto device = "aa:bb:cc:dd:ee:ff"; std::time_t t = std::time(NULL); char day[100]; std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t)); cass_statement_bind_string(statement, 0, device); cass_statement_bind_string(statement, 1, day); auto timestamp = std::time(NULL); cass_statement_bind_int64(statement, 2, timestamp); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { print_error(future); } cass_future_free(future); cass_statement_free(statement); return rc; } struct sensor_measurement { int sensor; int value; sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { } ~sensor_measurement() = default; }; struct device_measurement { string device; long timestamp; vector sensors; device_measurement(string &device, long timestamp, vector &&sensors) : device(device), timestamp(timestamp), sensors(std::move(sensors)) { }; ~device_measurement() = default; string str() { stringstream buf; buf << "device=" << device; buf << ", timestamp=" << timestamp; std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { buf << ", #" << sensor.sensor << "=" + sensor.value; }); return buf.str(); } }; template boost::optional lexical_cast_optional(boost::optional &a) { if (!a.is_initialized()) { return boost::none; } try { return boost::lexical_cast(a); } catch (bad_lexical_cast &e) { return boost::none; } } void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); cout << "storing message: " << endl; cout << payload << endl; cout << "----------------------------------------" << endl; KeyDictionary dict; auto sample_buffer = make_shared(); // auto parser = open_sample_stream_parser(sample_buffer, dict); auto input = make_shared(sample_buffer, dict); mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); input->process(buffer); input->finish(); cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; auto device_key = dict.indexOf("device"); auto timestamp_key = dict.indexOf("timestamp"); std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; auto deviceO = sample.at(device_key); auto timestampS = sample.at(timestamp_key); if (!deviceO) { cout << "Missing required key 'device'" << endl; } if (!timestampS) { cout << "Missing required key 'timestamp'" << endl; } auto device = deviceO.get(); auto timestamp = lexical_cast_optional(timestampS); if (!timestamp) { cout << "Invalid value for 'timestamp'" << endl; } vector sensors; for (int i = 0; i < 10; i++) { auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i))); auto valueS = sample.at(dict.indexOf("value" + to_string(i))); auto sensor = lexical_cast_optional(sensorS); auto value = lexical_cast_optional(valueS); if (!sensor || !value) { continue; } sensors.emplace_back(sensor.get(), value.get()); } if (sensors.size() == 0) { return; } device_measurement measurement(device, timestamp.get(), std::move(sensors)); cout << "Measurement: " << measurement.str() << endl; }); } int main(int argc, const char **argv) { mqtt_lib mqtt_lib(); string cassandra_cluster; po::options_description all("Options"); all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); all.add_options()("mqtt-broker-host", po::value(&mqtt_broker_host)->default_value("trygvis.io")); po::variables_map vm; try { auto parsed = po::parse_command_line(argc, argv, all); po::store(parsed, vm); po::notify(vm); auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { cerr << all << "\n"; return EXIT_FAILURE; } if (unrecognized.size()) { cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; return EXIT_FAILURE; } } catch (po::required_option &e) { cerr << "Missing required option: " << e.get_option_name() << endl; cerr << all << endl; } catch (po::unknown_option &e) { cerr << e.what() << endl; return EXIT_FAILURE; } mqtt_client mqtt_client(on_message); CassFuture *connect_future = nullptr; CassCluster *cluster = cass_cluster_new(); CassSession *session = cass_session_new(); cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); connect_future = cass_session_connect(session, cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = to_string(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; } cout << "Connected to Cassandra" << endl; execute_query(session, "USE " + keyspace_name); should_run = true; while (should_run) { cout << "sleeping.." << endl; std::this_thread::sleep_for(60s); } return 0; }