aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-13 01:16:34 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-13 01:16:34 +0200
commitf2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (patch)
tree100ba509749344d4dec02e3703b8cf4e283c6c63
parentdeb0c6cf01cb2b9994c77a6dd31341be8d1f1f4d (diff)
downloadmqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.gz
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.bz2
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.xz
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.zip
o Fully functional reception from MQTT into Cassandra.
-rw-r--r--CMakeLists.txt2
-rw-r--r--cassandra_support.h151
-rw-r--r--main.cpp190
3 files changed, 250 insertions, 93 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9c379c5..74197cb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -3,7 +3,7 @@ project(mqtt_cassandra_bridge)
find_package(Boost COMPONENTS regex system program_options REQUIRED)
-add_executable(mqtt_cassandra_bridge main.cpp)
+add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h)
target_link_libraries(mqtt_cassandra_bridge PUBLIC ${Boost_LIBRARIES})
target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14")
diff --git a/cassandra_support.h b/cassandra_support.h
new file mode 100644
index 0000000..167f69b
--- /dev/null
+++ b/cassandra_support.h
@@ -0,0 +1,151 @@
+#ifndef TRYGVIS_CASSANDRA_SUPPORT_H
+#define TRYGVIS_CASSANDRA_SUPPORT_H
+
+#include <stddef.h>
+#include <string>
+#include <cassandra.h>
+#include <stdexcept>
+
+namespace trygvis {
+namespace cassandra_support {
+
+using namespace std;
+
+class cassandra_error : runtime_error {
+public:
+ cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) {
+ }
+};
+
+string error_message(CassFuture *future) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(future, &message, &message_length);
+ return string(message, message_length);
+}
+
+static CassError execute_query(CassSession *session, const string &&query) {
+ CassStatement *statement = cass_statement_new(query.c_str(), 0);
+
+ CassFuture *future = cass_session_execute(session, statement);
+ cass_future_wait(future);
+
+ CassError rc = cass_future_error_code(future);
+
+ cass_future_free(future);
+ cass_statement_free(statement);
+
+ return rc;
+}
+
+void assert_ok(const string &context, CassError &err) {
+ if (err == CASS_OK) {
+ return;
+ }
+
+ throw cassandra_error(context, err);
+}
+
+class cassandra_tuple {
+public:
+ cassandra_tuple(size_t item_count) {
+ tuple = cass_tuple_new(item_count);
+ }
+
+ ~cassandra_tuple() {
+ cass_tuple_free(tuple);
+ }
+
+ void set(size_t i, cass_int32_t value) {
+ cass_tuple_set_int32(tuple, i, value);
+ }
+
+ CassTuple *tuple;
+};
+
+class cassandra_collection {
+public:
+ cassandra_collection(CassCollectionType type, size_t item_count) {
+ collection = cass_collection_new(type, item_count);
+ }
+
+ ~cassandra_collection() {
+ cass_collection_free(collection);
+ }
+
+ void append_tuple(cassandra_tuple &&tuple) {
+ cass_collection_append_tuple(collection, tuple.tuple);
+ }
+
+ CassCollection *collection;
+};
+
+CassError wait_for_future(CassFuture *future) {
+ cass_future_wait(future);
+
+ CassError rc = cass_future_error_code(future);
+
+ cass_future_free(future);
+
+ return rc;
+};
+
+class cassandra_statement {
+public:
+ cassandra_statement(string q, size_t argument_count) {
+ statement = cass_statement_new(q.c_str(), argument_count);
+ };
+
+ ~cassandra_statement() {
+ cass_statement_free(statement);
+ }
+
+ void bind(size_t i, const string &value) {
+ auto err = cass_statement_bind_string(statement, i, value.c_str());
+ assert_ok("cass_statement_bind_string", err);
+ }
+
+ void bind(size_t i, const char *value) {
+ auto err = cass_statement_bind_string(statement, i, value);
+ assert_ok("cass_statement_bind_string", err);
+ }
+
+ void bind(size_t i, const cass_int64_t value) {
+ auto err = cass_statement_bind_int64(statement, i, value);
+ assert_ok("cass_statement_bind_int64", err);
+ }
+
+ void bind(size_t i, const CassCollection *value) {
+ auto err = cass_statement_bind_collection(statement, i, value);
+ assert_ok("cass_statement_bind_collection", err);
+ }
+
+ void bind(size_t i, const cassandra_collection &value) {
+ auto err = cass_statement_bind_collection(statement, i, value.collection);
+ assert_ok("cass_statement_bind_collection", err);
+ }
+
+ CassStatement *statement;
+};
+
+class cassandra_session {
+public:
+ cassandra_session() {
+ session = cass_session_new();
+ }
+
+ ~cassandra_session() {
+ cass_session_free(session);
+ }
+
+ cassandra_session(const cassandra_session &) = delete;
+
+ cassandra_session &operator=(const cassandra_session &) = delete;
+
+ CassSession *session;
+};
+
+}
+}
+
+#endif
diff --git a/main.cpp b/main.cpp
index 0cd5622..744f697 100644
--- a/main.cpp
+++ b/main.cpp
@@ -1,5 +1,5 @@
+#include "cassandra_support.h"
#include "mosquittopp.h"
-#include "cassandra.h"
#include "trygvis/sensor/io.h"
#include <thread>
#include <boost/lexical_cast.hpp>
@@ -9,6 +9,7 @@ using namespace std;
using namespace std::chrono;
using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
+using namespace trygvis::cassandra_support;
using namespace boost;
namespace po = boost::program_options;
@@ -18,6 +19,40 @@ static auto mqtt_broker_port = 1883;
static auto queue_name = "/trygvis";
static string keyspace_name = "soil_moisture";
+static unique_ptr<cassandra_session> current_cassandra_session;
+
+struct sensor_measurement {
+ int sensor;
+ int value;
+
+ sensor_measurement(int sensor, int value) : sensor(sensor), value(value) {
+ }
+
+ ~sensor_measurement() = default;
+};
+
+struct device_measurement {
+ string device;
+ long timestamp;
+ vector<sensor_measurement> sensors;
+
+ device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) :
+ device(device), timestamp(timestamp), sensors(std::move(sensors)) {
+ };
+
+ ~device_measurement() = default;
+
+ string str() {
+ stringstream buf;
+ buf << "device=" << device;
+ buf << ", timestamp=" << timestamp;
+ std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) {
+ buf << ", #" << sensor.sensor << "=" + sensor.value;
+ });
+ return buf.str();
+ }
+};
+
string &&to_string(CassFuture *f) {
const char *message;
size_t message_length;
@@ -98,30 +133,7 @@ private:
};
void print_error(CassFuture *future) {
- const char *message;
- size_t message_length;
- cass_future_error_message(future, &message, &message_length);
- string msg(message, message_length);
- cout << "Cassandra error: " << msg << endl;
-}
-
-CassError execute_query(CassSession *session, const string &&query) {
- CassError rc = CASS_OK;
- CassFuture *future = NULL;
- CassStatement *statement = cass_statement_new(query.c_str(), 0);
-
- future = cass_session_execute(session, statement);
- cass_future_wait(future);
-
- rc = cass_future_error_code(future);
- if (rc != CASS_OK) {
- print_error(future);
- }
-
- cass_future_free(future);
- cass_statement_free(statement);
-
- return rc;
+ cout << "Cassandra error: " << error_message(future) << endl;
}
/*
@@ -133,84 +145,67 @@ CassError execute_query(CassSession *session, const string &&query) {
PRIMARY KEY ((device, day), timestamp)
)
*/
+CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
+ cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
-CassError insert_into_sm_by_day(CassSession *session, string payload) {
- CassError rc = CASS_OK;
- CassStatement *statement = nullptr;
- CassFuture *future = nullptr;
- auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);";
-
- statement = cass_statement_new(query, 4);
-
- auto device = "aa:bb:cc:dd:ee:ff";
+ q.bind(0, measurement.device);
std::time_t t = std::time(NULL);
char day[100];
- std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t));
- cass_statement_bind_string(statement, 0, device);
- cass_statement_bind_string(statement, 1, day);
+ std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
+ q.bind(1, day);
auto timestamp = std::time(NULL);
- cass_statement_bind_int64(statement, 2, timestamp);
-
- future = cass_session_execute(session, statement);
- cass_future_wait(future);
-
- rc = cass_future_error_code(future);
- if (rc != CASS_OK) {
- print_error(future);
- }
+ q.bind(2, timestamp);
+
+ cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
+ for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
+ cassandra_tuple tuple(2);
+ tuple.set(0, sensor.sensor);
+ tuple.set(1, sensor.value);
+ sensors.append_tuple(std::move(tuple));
+ });
- cass_future_free(future);
- cass_statement_free(statement);
+ q.bind(3, sensors);
- return rc;
+ return wait_for_future(cass_session_execute(session, q.statement));
}
-struct sensor_measurement {
- int sensor;
- int value;
-
- sensor_measurement(int sensor, int value) : sensor(sensor), value(value) {
+template<typename Target, typename Source>
+boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) {
+ if (!a.is_initialized()) {
+ return boost::none;
}
- ~sensor_measurement() = default;
-};
-
-struct device_measurement {
- string device;
- long timestamp;
- vector<sensor_measurement> sensors;
-
- device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) :
- device(device), timestamp(timestamp), sensors(std::move(sensors)) {
- };
-
- ~device_measurement() = default;
-
- string str() {
- stringstream buf;
- buf << "device=" << device;
- buf << ", timestamp=" << timestamp;
- std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) {
- buf << ", #" << sensor.sensor << "=" + sensor.value;
- });
- return buf.str();
- }
-};
+ return make_optional(f(a));
+}
template<typename Target, typename Source>
-boost::optional<Target> lexical_cast_optional(boost::optional<Source> &a) {
+boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) {
if (!a.is_initialized()) {
return boost::none;
}
+ return f(a.get());
+}
+
+//template<typename Target, typename Source>
+//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) {
+// if (!a.is_initialized()) {
+// return boost::none;
+// }
+//
+// return f(a.get());
+//}
+
+template<typename Target, typename Source = string>
+boost::optional<Target> l_c(const Source source) {
try {
- return boost::lexical_cast<Target>(a);
+ return boost::lexical_cast<Target>(source);
} catch (bad_lexical_cast &e) {
return boost::none;
}
-}
+};
void on_message(const struct mosquitto_message *message) {
string payload((const char *) message->payload, (size_t) message->payloadlen);
@@ -220,7 +215,6 @@ void on_message(const struct mosquitto_message *message) {
KeyDictionary dict;
auto sample_buffer = make_shared<VectorSampleOutputStream>();
-// auto parser = open_sample_stream_parser(sample_buffer, dict);
auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict);
mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen);
@@ -249,7 +243,7 @@ void on_message(const struct mosquitto_message *message) {
auto device = deviceO.get();
- auto timestamp = lexical_cast_optional<long>(timestampS);
+ auto timestamp = flat_map(timestampS, l_c<long>);
if (!timestamp) {
cout << "Invalid value for 'timestamp'" << endl;
}
@@ -257,17 +251,15 @@ void on_message(const struct mosquitto_message *message) {
vector<sensor_measurement> sensors;
for (int i = 0; i < 10; i++) {
- auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i)));
- auto valueS = sample.at(dict.indexOf("value" + to_string(i)));
+ auto valueS = sample.at(dict.indexOf("sensor" + to_string(i)));
- auto sensor = lexical_cast_optional<int>(sensorS);
- auto value = lexical_cast_optional<int>(valueS);
+ auto value = flat_map(valueS, l_c<int>);
- if (!sensor || !value) {
+ if (!value) {
continue;
}
- sensors.emplace_back(sensor.get(), value.get());
+ sensors.emplace_back(i, value.get());
}
if (sensors.size() == 0) {
@@ -277,6 +269,16 @@ void on_message(const struct mosquitto_message *message) {
device_measurement measurement(device, timestamp.get(), std::move(sensors));
cout << "Measurement: " << measurement.str() << endl;
+
+ if (current_cassandra_session) {
+ auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement));
+
+ cout << "rc=" << rc << endl;
+
+ assert_ok("wait_for_future", rc);
+ } else {
+ cout << "Not connected to Cassandra" << endl;
+ }
});
}
@@ -316,20 +318,22 @@ int main(int argc, const char **argv) {
mqtt_client mqtt_client(on_message);
CassFuture *connect_future = nullptr;
CassCluster *cluster = cass_cluster_new();
- CassSession *session = cass_session_new();
+ auto session = make_unique<cassandra_session>();
cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
- connect_future = cass_session_connect(session, cluster);
+ connect_future = cass_session_connect(session->session, cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
string s = to_string(connect_future);
cerr << "Could not connect to Cassandra:" << s << endl;
+ return EXIT_FAILURE;
}
cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
- execute_query(session, "USE " + keyspace_name);
+ execute_query(current_cassandra_session->session, "USE " + keyspace_name);
should_run = true;
while (should_run) {
@@ -337,5 +341,7 @@ int main(int argc, const char **argv) {
std::this_thread::sleep_for(60s);
}
+ current_cassandra_session.release();
+
return 0;
}