From 06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 2 Aug 2015 16:27:10 +0200 Subject: o Adding a MQTT consumer that stores the parsed record in Cassandra. --- sm-mqtt-consumer.cpp | 78 ++++++++++++++-------------------------------------- 1 file changed, 20 insertions(+), 58 deletions(-) (limited to 'sm-mqtt-consumer.cpp') diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index e1f6801..b91b663 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -1,15 +1,19 @@ #include "cassandra_support.h" -#include "mosquittopp.h" -#include "trygvis/sensor/io.h" +#include "mqtt_support.h" +#include "misc_support.h" #include -#include #include +#include + +namespace sm_mqtt_consumer { using namespace std; using namespace std::chrono; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace trygvis::cassandra_support; +using namespace trygvis::mqtt_support; +using namespace trygvis::misc_support; using namespace boost; namespace po = boost::program_options; @@ -53,29 +57,12 @@ struct device_measurement { } }; -string &&to_string(CassFuture *f) { - const char *message; - size_t message_length; - cass_future_error_message(f, &message, &message_length); - - return std::move(string(message, message_length)); -} - -class mqtt_lib { -public: - mqtt_lib() { - mosquitto_lib_init(); - } - - ~mqtt_lib() { - mosquitto_lib_cleanup(); - } -}; - class mqtt_client : private mosqpp::mosquittopp { public: - mqtt_client(std::function on_message_) : mosquittopp(), - on_message_(on_message_) { + typedef std::function callback_t; + + mqtt_client(callback_t on_message_) : mosquittopp(), + on_message_(on_message_) { cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; loop_start(); connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); @@ -87,7 +74,7 @@ public: } private: - std::function on_message_; + callback_t on_message_; bool subscribed = false; void on_connect(int rc) override { @@ -132,10 +119,6 @@ private: } }; -void print_error(CassFuture *future) { - cout << "Cassandra error: " << error_message(future) << endl; -} - auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); @@ -161,33 +144,6 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen return cass_session_execute(session, q); } -template -boost::optional map(boost::optional &a, std::function f) { - if (!a.is_initialized()) { - return boost::none; - } - - return make_optional(f(a)); -} - -template -boost::optional flat_map(boost::optional &a, boost::optional (&f)(Source)) { - if (!a.is_initialized()) { - return boost::none; - } - - return f(a.get()); -} - -template -boost::optional l_c(const Source source) { - try { - return boost::lexical_cast(source); - } catch (bad_lexical_cast &e) { - return boost::none; - } -}; - void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); cout << "storing message: " << endl; @@ -255,7 +211,7 @@ void on_message(const struct mosquitto_message *message) { handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) { cout << "Success!" << endl; }, [&](auto future, auto err) { - cout << "Failure: " << error_message(future) << endl; + cout << "Failure: " << cassandra_future::error_message(future) << endl; }); } else { cout << "Not connected to Cassandra" << endl; @@ -306,7 +262,7 @@ int main(int argc, const char **argv) { connect_future = cass_session_connect((CassSession*) session.get(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { - string s = to_string(connect_future); + string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; return EXIT_FAILURE; } @@ -326,3 +282,9 @@ int main(int argc, const char **argv) { return 0; } + +} + +int main(int argc, const char **argv) { + return sm_mqtt_consumer::main(argc, argv); +} -- cgit v1.2.3