diff options
Diffstat (limited to 'main.cpp')
-rw-r--r-- | main.cpp | 190 |
1 files changed, 98 insertions, 92 deletions
@@ -1,5 +1,5 @@ +#include "cassandra_support.h" #include "mosquittopp.h" -#include "cassandra.h" #include "trygvis/sensor/io.h" #include <thread> #include <boost/lexical_cast.hpp> @@ -9,6 +9,7 @@ using namespace std; using namespace std::chrono; using namespace trygvis::sensor; using namespace trygvis::sensor::io; +using namespace trygvis::cassandra_support; using namespace boost; namespace po = boost::program_options; @@ -18,6 +19,40 @@ static auto mqtt_broker_port = 1883; static auto queue_name = "/trygvis"; static string keyspace_name = "soil_moisture"; +static unique_ptr<cassandra_session> current_cassandra_session; + +struct sensor_measurement { + int sensor; + int value; + + sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { + } + + ~sensor_measurement() = default; +}; + +struct device_measurement { + string device; + long timestamp; + vector<sensor_measurement> sensors; + + device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) : + device(device), timestamp(timestamp), sensors(std::move(sensors)) { + }; + + ~device_measurement() = default; + + string str() { + stringstream buf; + buf << "device=" << device; + buf << ", timestamp=" << timestamp; + std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { + buf << ", #" << sensor.sensor << "=" + sensor.value; + }); + return buf.str(); + } +}; + string &&to_string(CassFuture *f) { const char *message; size_t message_length; @@ -98,30 +133,7 @@ private: }; void print_error(CassFuture *future) { - const char *message; - size_t message_length; - cass_future_error_message(future, &message, &message_length); - string msg(message, message_length); - cout << "Cassandra error: " << msg << endl; -} - -CassError execute_query(CassSession *session, const string &&query) { - CassError rc = CASS_OK; - CassFuture *future = NULL; - CassStatement *statement = cass_statement_new(query.c_str(), 0); - - future = cass_session_execute(session, statement); - cass_future_wait(future); - - rc = cass_future_error_code(future); - if (rc != CASS_OK) { - print_error(future); - } - - cass_future_free(future); - cass_statement_free(statement); - - return rc; + cout << "Cassandra error: " << error_message(future) << endl; } /* @@ -133,84 +145,67 @@ CassError execute_query(CassSession *session, const string &&query) { PRIMARY KEY ((device, day), timestamp) ) */ +CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { + cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); -CassError insert_into_sm_by_day(CassSession *session, string payload) { - CassError rc = CASS_OK; - CassStatement *statement = nullptr; - CassFuture *future = nullptr; - auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);"; - - statement = cass_statement_new(query, 4); - - auto device = "aa:bb:cc:dd:ee:ff"; + q.bind(0, measurement.device); std::time_t t = std::time(NULL); char day[100]; - std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t)); - cass_statement_bind_string(statement, 0, device); - cass_statement_bind_string(statement, 1, day); + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(1, day); auto timestamp = std::time(NULL); - cass_statement_bind_int64(statement, 2, timestamp); - - future = cass_session_execute(session, statement); - cass_future_wait(future); - - rc = cass_future_error_code(future); - if (rc != CASS_OK) { - print_error(future); - } + q.bind(2, timestamp); + + cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); + for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { + cassandra_tuple tuple(2); + tuple.set(0, sensor.sensor); + tuple.set(1, sensor.value); + sensors.append_tuple(std::move(tuple)); + }); - cass_future_free(future); - cass_statement_free(statement); + q.bind(3, sensors); - return rc; + return wait_for_future(cass_session_execute(session, q.statement)); } -struct sensor_measurement { - int sensor; - int value; - - sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { +template<typename Target, typename Source> +boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) { + if (!a.is_initialized()) { + return boost::none; } - ~sensor_measurement() = default; -}; - -struct device_measurement { - string device; - long timestamp; - vector<sensor_measurement> sensors; - - device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) : - device(device), timestamp(timestamp), sensors(std::move(sensors)) { - }; - - ~device_measurement() = default; - - string str() { - stringstream buf; - buf << "device=" << device; - buf << ", timestamp=" << timestamp; - std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { - buf << ", #" << sensor.sensor << "=" + sensor.value; - }); - return buf.str(); - } -}; + return make_optional(f(a)); +} template<typename Target, typename Source> -boost::optional<Target> lexical_cast_optional(boost::optional<Source> &a) { +boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) { if (!a.is_initialized()) { return boost::none; } + return f(a.get()); +} + +//template<typename Target, typename Source> +//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) { +// if (!a.is_initialized()) { +// return boost::none; +// } +// +// return f(a.get()); +//} + +template<typename Target, typename Source = string> +boost::optional<Target> l_c(const Source source) { try { - return boost::lexical_cast<Target>(a); + return boost::lexical_cast<Target>(source); } catch (bad_lexical_cast &e) { return boost::none; } -} +}; void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); @@ -220,7 +215,6 @@ void on_message(const struct mosquitto_message *message) { KeyDictionary dict; auto sample_buffer = make_shared<VectorSampleOutputStream>(); -// auto parser = open_sample_stream_parser(sample_buffer, dict); auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict); mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); @@ -249,7 +243,7 @@ void on_message(const struct mosquitto_message *message) { auto device = deviceO.get(); - auto timestamp = lexical_cast_optional<long>(timestampS); + auto timestamp = flat_map(timestampS, l_c<long>); if (!timestamp) { cout << "Invalid value for 'timestamp'" << endl; } @@ -257,17 +251,15 @@ void on_message(const struct mosquitto_message *message) { vector<sensor_measurement> sensors; for (int i = 0; i < 10; i++) { - auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i))); - auto valueS = sample.at(dict.indexOf("value" + to_string(i))); + auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); - auto sensor = lexical_cast_optional<int>(sensorS); - auto value = lexical_cast_optional<int>(valueS); + auto value = flat_map(valueS, l_c<int>); - if (!sensor || !value) { + if (!value) { continue; } - sensors.emplace_back(sensor.get(), value.get()); + sensors.emplace_back(i, value.get()); } if (sensors.size() == 0) { @@ -277,6 +269,16 @@ void on_message(const struct mosquitto_message *message) { device_measurement measurement(device, timestamp.get(), std::move(sensors)); cout << "Measurement: " << measurement.str() << endl; + + if (current_cassandra_session) { + auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)); + + cout << "rc=" << rc << endl; + + assert_ok("wait_for_future", rc); + } else { + cout << "Not connected to Cassandra" << endl; + } }); } @@ -316,20 +318,22 @@ int main(int argc, const char **argv) { mqtt_client mqtt_client(on_message); CassFuture *connect_future = nullptr; CassCluster *cluster = cass_cluster_new(); - CassSession *session = cass_session_new(); + auto session = make_unique<cassandra_session>(); cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - connect_future = cass_session_connect(session, cluster); + connect_future = cass_session_connect(session->session, cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = to_string(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; } cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); - execute_query(session, "USE " + keyspace_name); + execute_query(current_cassandra_session->session, "USE " + keyspace_name); should_run = true; while (should_run) { @@ -337,5 +341,7 @@ int main(int argc, const char **argv) { std::this_thread::sleep_for(60s); } + current_cassandra_session.release(); + return 0; } |