From d77ebb924c1eeca345bbb3f1eeb2df3058a52a18 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 15 Jul 2015 20:03:18 +0200 Subject: o Renaming binaries. --- sm-mqtt-consumer.cpp | 328 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 sm-mqtt-consumer.cpp (limited to 'sm-mqtt-consumer.cpp') diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp new file mode 100644 index 0000000..d7dbfdc --- /dev/null +++ b/sm-mqtt-consumer.cpp @@ -0,0 +1,328 @@ +#include "cassandra_support.h" +#include "mosquittopp.h" +#include "trygvis/sensor/io.h" +#include +#include +#include + +using namespace std; +using namespace std::chrono; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace trygvis::cassandra_support; +using namespace boost; +namespace po = boost::program_options; + +static bool should_run; +static string mqtt_broker_host; +static auto mqtt_broker_port = 1883; +static auto queue_name = "/trygvis"; +static string keyspace_name = "soil_moisture"; + +static unique_ptr current_cassandra_session; + +struct sensor_measurement { + int sensor; + int value; + + sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { + } + + ~sensor_measurement() = default; +}; + +struct device_measurement { + string device; + long timestamp; + vector sensors; + + device_measurement(string &device, long timestamp, vector &&sensors) : + device(device), timestamp(timestamp), sensors(std::move(sensors)) { + }; + + ~device_measurement() = default; + + string str() { + stringstream buf; + buf << "device=" << device; + buf << ", timestamp=" << timestamp; + std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { + buf << ", #" << sensor.sensor << "=" + sensor.value; + }); + return buf.str(); + } +}; + +string &&to_string(CassFuture *f) { + const char *message; + size_t message_length; + cass_future_error_message(f, &message, &message_length); + + return std::move(string(message, message_length)); +} + +class mqtt_lib { +public: + mqtt_lib() { + mosquitto_lib_init(); + } + + ~mqtt_lib() { + mosquitto_lib_cleanup(); + } +}; + +class mqtt_client : private mosqpp::mosquittopp { +public: + mqtt_client(std::function on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; + loop_start(); + connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); + } + + ~mqtt_client() { + loop_stop(true); + disconnect(); + } + +private: + std::function on_message_; + bool subscribed = false; + + void on_connect(int rc) override { + cout << "Connected to MQTT broker, rc=" << rc << endl; +// should_run = false; + int qos = 0; + if (!subscribed) { + subscribed = true; + cout << "Subscribing..." << endl; + subscribe(nullptr, queue_name, qos); + } + } + + void on_disconnect(int rc) override { + subscribed = false; + + cout << "Oops, disconnected, rc=" << rc << endl; + } + + void on_publish(int mid) override { + } + + void on_message(const struct mosquitto_message *message) override { + string payload((const char *) message->payload, (size_t) message->payloadlen); + on_message_(message); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + cout << "Subscribed" << endl; + } + + void on_unsubscribe(int mid) override { + cout << "Oops, unsubscribed" << endl; + } + + void on_log(int level, const char *str) override { + cout << "MQTT: " << level << ":" << str << endl; + } + + void on_error() override { + cout << "Oops, error" << endl; + } +}; + +void print_error(CassFuture *future) { + cout << "Cassandra error: " << error_message(future) << endl; +} + +auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { + cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); + + q.bind(0, measurement.device); + + std::time_t t = measurement.timestamp; + char day[100]; + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(1, day); + + q.bind(2, measurement.timestamp * 1000); + + cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); + for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { + cassandra_tuple tuple(2); + tuple.set(0, sensor.sensor); + tuple.set(1, sensor.value); + sensors.append(std::move(tuple)); + }); + + q.bind(3, sensors); + + return cass_session_execute(session, q.statement); +} + +template +boost::optional map(boost::optional &a, std::function f) { + if (!a.is_initialized()) { + return boost::none; + } + + return make_optional(f(a)); +} + +template +boost::optional flat_map(boost::optional &a, boost::optional (&f)(Source)) { + if (!a.is_initialized()) { + return boost::none; + } + + return f(a.get()); +} + +template +boost::optional l_c(const Source source) { + try { + return boost::lexical_cast(source); + } catch (bad_lexical_cast &e) { + return boost::none; + } +}; + +void on_message(const struct mosquitto_message *message) { + string payload((const char *) message->payload, (size_t) message->payloadlen); + cout << "storing message: " << endl; + cout << payload << endl; + cout << "----------------------------------------" << endl; + + KeyDictionary dict; + auto sample_buffer = make_shared(); + auto input = make_shared(sample_buffer, dict); + + mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); + + input->process(buffer); + input->finish(); + + cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; + + auto device_key = dict.indexOf("device"); + auto timestamp_key = dict.indexOf("timestamp"); + + std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { + cout << "Sample: " << sample.to_string() << endl; + + auto deviceO = sample.at(device_key); + auto timestampS = sample.at(timestamp_key); + + if (!deviceO) { + cout << "Missing required key 'device'" << endl; + } + + if (!timestampS) { + cout << "Missing required key 'timestamp'" << endl; + } + + auto device = deviceO.get(); + + auto timestamp = flat_map(timestampS, l_c); + if (!timestamp) { + cout << "Invalid value for 'timestamp'" << endl; + } + + vector sensors; + + for (int i = 0; i < 10; i++) { + auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); + + auto value = flat_map(valueS, l_c); + + if (!value) { + continue; + } + + sensors.emplace_back(i, value.get()); + } + + if (sensors.size() == 0) { + return; + } + + device_measurement measurement(device, timestamp.get(), std::move(sensors)); + + cout << "Measurement: " << measurement.str() << endl; + + if (current_cassandra_session) { + handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + cout << "Success!" << endl; + }, [&](auto future, auto err) { + cout << "Failure: " << error_message(future) << endl; + }); + } else { + cout << "Not connected to Cassandra" << endl; + } + }); +} + +int main(int argc, const char **argv) { + mqtt_lib mqtt_lib(); + + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); + all.add_options()("mqtt-broker-host", po::value(&mqtt_broker_host)->default_value("trygvis.io")); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + mqtt_client mqtt_client(on_message); + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique(); + + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + connect_future = cass_session_connect(session->session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = to_string(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; + } + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + execute_query(current_cassandra_session->session, "USE " + keyspace_name); + + should_run = true; + while (should_run) { + cout << "sleeping.." << endl; + std::this_thread::sleep_for(60s); + } + + current_cassandra_session.release(); + + return 0; +} -- cgit v1.2.3