diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-13 01:16:34 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-13 01:16:34 +0200 |
commit | f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (patch) | |
tree | 100ba509749344d4dec02e3703b8cf4e283c6c63 | |
parent | deb0c6cf01cb2b9994c77a6dd31341be8d1f1f4d (diff) | |
download | mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.gz mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.bz2 mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.xz mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.zip |
o Fully functional reception from MQTT into Cassandra.
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | cassandra_support.h | 151 | ||||
-rw-r--r-- | main.cpp | 190 |
3 files changed, 250 insertions, 93 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c379c5..74197cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ project(mqtt_cassandra_bridge) find_package(Boost COMPONENTS regex system program_options REQUIRED) -add_executable(mqtt_cassandra_bridge main.cpp) +add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h) target_link_libraries(mqtt_cassandra_bridge PUBLIC ${Boost_LIBRARIES}) target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14") diff --git a/cassandra_support.h b/cassandra_support.h new file mode 100644 index 0000000..167f69b --- /dev/null +++ b/cassandra_support.h @@ -0,0 +1,151 @@ +#ifndef TRYGVIS_CASSANDRA_SUPPORT_H +#define TRYGVIS_CASSANDRA_SUPPORT_H + +#include <stddef.h> +#include <string> +#include <cassandra.h> +#include <stdexcept> + +namespace trygvis { +namespace cassandra_support { + +using namespace std; + +class cassandra_error : runtime_error { +public: + cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) { + } +}; + +string error_message(CassFuture *future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + return string(message, message_length); +} + +static CassError execute_query(CassSession *session, const string &&query) { + CassStatement *statement = cass_statement_new(query.c_str(), 0); + + CassFuture *future = cass_session_execute(session, statement); + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + cass_statement_free(statement); + + return rc; +} + +void assert_ok(const string &context, CassError &err) { + if (err == CASS_OK) { + return; + } + + throw cassandra_error(context, err); +} + +class cassandra_tuple { +public: + cassandra_tuple(size_t item_count) { + tuple = cass_tuple_new(item_count); + } + + ~cassandra_tuple() { + cass_tuple_free(tuple); + } + + void set(size_t i, cass_int32_t value) { + cass_tuple_set_int32(tuple, i, value); + } + + CassTuple *tuple; +}; + +class cassandra_collection { +public: + cassandra_collection(CassCollectionType type, size_t item_count) { + collection = cass_collection_new(type, item_count); + } + + ~cassandra_collection() { + cass_collection_free(collection); + } + + void append_tuple(cassandra_tuple &&tuple) { + cass_collection_append_tuple(collection, tuple.tuple); + } + + CassCollection *collection; +}; + +CassError wait_for_future(CassFuture *future) { + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + + return rc; +}; + +class cassandra_statement { +public: + cassandra_statement(string q, size_t argument_count) { + statement = cass_statement_new(q.c_str(), argument_count); + }; + + ~cassandra_statement() { + cass_statement_free(statement); + } + + void bind(size_t i, const string &value) { + auto err = cass_statement_bind_string(statement, i, value.c_str()); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const char *value) { + auto err = cass_statement_bind_string(statement, i, value); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const cass_int64_t value) { + auto err = cass_statement_bind_int64(statement, i, value); + assert_ok("cass_statement_bind_int64", err); + } + + void bind(size_t i, const CassCollection *value) { + auto err = cass_statement_bind_collection(statement, i, value); + assert_ok("cass_statement_bind_collection", err); + } + + void bind(size_t i, const cassandra_collection &value) { + auto err = cass_statement_bind_collection(statement, i, value.collection); + assert_ok("cass_statement_bind_collection", err); + } + + CassStatement *statement; +}; + +class cassandra_session { +public: + cassandra_session() { + session = cass_session_new(); + } + + ~cassandra_session() { + cass_session_free(session); + } + + cassandra_session(const cassandra_session &) = delete; + + cassandra_session &operator=(const cassandra_session &) = delete; + + CassSession *session; +}; + +} +} + +#endif @@ -1,5 +1,5 @@ +#include "cassandra_support.h" #include "mosquittopp.h" -#include "cassandra.h" #include "trygvis/sensor/io.h" #include <thread> #include <boost/lexical_cast.hpp> @@ -9,6 +9,7 @@ using namespace std; using namespace std::chrono; using namespace trygvis::sensor; using namespace trygvis::sensor::io; +using namespace trygvis::cassandra_support; using namespace boost; namespace po = boost::program_options; @@ -18,6 +19,40 @@ static auto mqtt_broker_port = 1883; static auto queue_name = "/trygvis"; static string keyspace_name = "soil_moisture"; +static unique_ptr<cassandra_session> current_cassandra_session; + +struct sensor_measurement { + int sensor; + int value; + + sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { + } + + ~sensor_measurement() = default; +}; + +struct device_measurement { + string device; + long timestamp; + vector<sensor_measurement> sensors; + + device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) : + device(device), timestamp(timestamp), sensors(std::move(sensors)) { + }; + + ~device_measurement() = default; + + string str() { + stringstream buf; + buf << "device=" << device; + buf << ", timestamp=" << timestamp; + std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { + buf << ", #" << sensor.sensor << "=" + sensor.value; + }); + return buf.str(); + } +}; + string &&to_string(CassFuture *f) { const char *message; size_t message_length; @@ -98,30 +133,7 @@ private: }; void print_error(CassFuture *future) { - const char *message; - size_t message_length; - cass_future_error_message(future, &message, &message_length); - string msg(message, message_length); - cout << "Cassandra error: " << msg << endl; -} - -CassError execute_query(CassSession *session, const string &&query) { - CassError rc = CASS_OK; - CassFuture *future = NULL; - CassStatement *statement = cass_statement_new(query.c_str(), 0); - - future = cass_session_execute(session, statement); - cass_future_wait(future); - - rc = cass_future_error_code(future); - if (rc != CASS_OK) { - print_error(future); - } - - cass_future_free(future); - cass_statement_free(statement); - - return rc; + cout << "Cassandra error: " << error_message(future) << endl; } /* @@ -133,84 +145,67 @@ CassError execute_query(CassSession *session, const string &&query) { PRIMARY KEY ((device, day), timestamp) ) */ +CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { + cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); -CassError insert_into_sm_by_day(CassSession *session, string payload) { - CassError rc = CASS_OK; - CassStatement *statement = nullptr; - CassFuture *future = nullptr; - auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);"; - - statement = cass_statement_new(query, 4); - - auto device = "aa:bb:cc:dd:ee:ff"; + q.bind(0, measurement.device); std::time_t t = std::time(NULL); char day[100]; - std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t)); - cass_statement_bind_string(statement, 0, device); - cass_statement_bind_string(statement, 1, day); + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(1, day); auto timestamp = std::time(NULL); - cass_statement_bind_int64(statement, 2, timestamp); - - future = cass_session_execute(session, statement); - cass_future_wait(future); - - rc = cass_future_error_code(future); - if (rc != CASS_OK) { - print_error(future); - } + q.bind(2, timestamp); + + cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); + for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { + cassandra_tuple tuple(2); + tuple.set(0, sensor.sensor); + tuple.set(1, sensor.value); + sensors.append_tuple(std::move(tuple)); + }); - cass_future_free(future); - cass_statement_free(statement); + q.bind(3, sensors); - return rc; + return wait_for_future(cass_session_execute(session, q.statement)); } -struct sensor_measurement { - int sensor; - int value; - - sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { +template<typename Target, typename Source> +boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) { + if (!a.is_initialized()) { + return boost::none; } - ~sensor_measurement() = default; -}; - -struct device_measurement { - string device; - long timestamp; - vector<sensor_measurement> sensors; - - device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) : - device(device), timestamp(timestamp), sensors(std::move(sensors)) { - }; - - ~device_measurement() = default; - - string str() { - stringstream buf; - buf << "device=" << device; - buf << ", timestamp=" << timestamp; - std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { - buf << ", #" << sensor.sensor << "=" + sensor.value; - }); - return buf.str(); - } -}; + return make_optional(f(a)); +} template<typename Target, typename Source> -boost::optional<Target> lexical_cast_optional(boost::optional<Source> &a) { +boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) { if (!a.is_initialized()) { return boost::none; } + return f(a.get()); +} + +//template<typename Target, typename Source> +//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) { +// if (!a.is_initialized()) { +// return boost::none; +// } +// +// return f(a.get()); +//} + +template<typename Target, typename Source = string> +boost::optional<Target> l_c(const Source source) { try { - return boost::lexical_cast<Target>(a); + return boost::lexical_cast<Target>(source); } catch (bad_lexical_cast &e) { return boost::none; } -} +}; void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); @@ -220,7 +215,6 @@ void on_message(const struct mosquitto_message *message) { KeyDictionary dict; auto sample_buffer = make_shared<VectorSampleOutputStream>(); -// auto parser = open_sample_stream_parser(sample_buffer, dict); auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict); mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); @@ -249,7 +243,7 @@ void on_message(const struct mosquitto_message *message) { auto device = deviceO.get(); - auto timestamp = lexical_cast_optional<long>(timestampS); + auto timestamp = flat_map(timestampS, l_c<long>); if (!timestamp) { cout << "Invalid value for 'timestamp'" << endl; } @@ -257,17 +251,15 @@ void on_message(const struct mosquitto_message *message) { vector<sensor_measurement> sensors; for (int i = 0; i < 10; i++) { - auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i))); - auto valueS = sample.at(dict.indexOf("value" + to_string(i))); + auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); - auto sensor = lexical_cast_optional<int>(sensorS); - auto value = lexical_cast_optional<int>(valueS); + auto value = flat_map(valueS, l_c<int>); - if (!sensor || !value) { + if (!value) { continue; } - sensors.emplace_back(sensor.get(), value.get()); + sensors.emplace_back(i, value.get()); } if (sensors.size() == 0) { @@ -277,6 +269,16 @@ void on_message(const struct mosquitto_message *message) { device_measurement measurement(device, timestamp.get(), std::move(sensors)); cout << "Measurement: " << measurement.str() << endl; + + if (current_cassandra_session) { + auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)); + + cout << "rc=" << rc << endl; + + assert_ok("wait_for_future", rc); + } else { + cout << "Not connected to Cassandra" << endl; + } }); } @@ -316,20 +318,22 @@ int main(int argc, const char **argv) { mqtt_client mqtt_client(on_message); CassFuture *connect_future = nullptr; CassCluster *cluster = cass_cluster_new(); - CassSession *session = cass_session_new(); + auto session = make_unique<cassandra_session>(); cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - connect_future = cass_session_connect(session, cluster); + connect_future = cass_session_connect(session->session, cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = to_string(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; } cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); - execute_query(session, "USE " + keyspace_name); + execute_query(current_cassandra_session->session, "USE " + keyspace_name); should_run = true; while (should_run) { @@ -337,5 +341,7 @@ int main(int argc, const char **argv) { std::this_thread::sleep_for(60s); } + current_cassandra_session.release(); + return 0; } |