aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-12 14:44:31 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-12 14:44:31 +0200
commit1dcff1b442e9d99feb473e282d22b08b9197bb3b (patch)
treeb61a46cb149aa1304990229c07b4db0f854456cb
downloadmqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.gz
mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.bz2
mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.xz
mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.zip
o Initial import.
-rw-r--r--.gitignore3
-rw-r--r--.gitmodules0
-rw-r--r--CMakeLists.txt36
-rw-r--r--README.md16
-rw-r--r--main.cpp311
5 files changed, 366 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2291abb
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+.idea
+*.iml
+build
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/.gitmodules
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..878abf9
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,36 @@
+cmake_minimum_required(VERSION 3.2)
+project(mqtt_cassandra_bridge)
+
+include(ExternalProject)
+
+add_executable(mqtt_cassandra_bridge main.cpp)
+target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14")
+
+# Cassandra
+set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver)
+ExternalProject_Add(cpp-driver
+ URL https://github.com/datastax/cpp-driver/archive/2.1.0-beta.tar.gz
+ URL_MD5 d3cfde8731acc2f0f51ef9caf41068dc
+ PREFIX ${CPP_DRIVER}
+ CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
+)
+target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include)
+target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so)
+add_dependencies(mqtt_cassandra_bridge cpp-driver)
+
+# ble-toys
+set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys)
+ExternalProject_Add(ble-toys
+ GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git
+ GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567
+ GIT_SUBMODULES json
+ BUILD_ALWAYS 0
+ PREFIX ${BLE_TOYS}
+ CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
+)
+add_dependencies(mqtt_cassandra_bridge ble-toys)
+target_include_directories(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/include)
+target_link_libraries(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a)
+
+# Mosquitto
+target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp)
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6ce8d37
--- /dev/null
+++ b/README.md
@@ -0,0 +1,16 @@
+# Create Schema
+
+ $ bin/cqlsh
+ cqlsh> create keyspace soil_moisture WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+ cqlsh> use soil_moisture;
+ cqlsh:soil_moisture>
+ create table sm_by_day(
+ device text,
+ day text,
+ timestamp timestamp,
+ sensors list<frozen<tuple<int, int>>>,
+ primary key(
+ (device, day),
+ timestamp
+ )
+ );
diff --git a/main.cpp b/main.cpp
new file mode 100644
index 0000000..806c047
--- /dev/null
+++ b/main.cpp
@@ -0,0 +1,311 @@
+#include "mosquittopp.h"
+#include "cassandra.h"
+#include "trygvis/sensor/io.h"
+#include <iostream>
+#include <chrono>
+#include <thread>
+#include <vector>
+#include <boost/lexical_cast.hpp>
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace boost;
+
+static bool should_run;
+static auto mqtt_broker_host = "test.mosquitto.org";
+static auto mqtt_broker_port = 1883;
+static auto queue_name = "/trygvis";
+static string keyspace_name = "soil_moisture";
+
+string &&to_string(CassFuture *f) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(f, &message, &message_length);
+
+ return std::move(string(message, message_length));
+}
+
+class mqtt_lib {
+public:
+ mqtt_lib() {
+ mosquitto_lib_init();
+ }
+
+ ~mqtt_lib() {
+ mosquitto_lib_cleanup();
+ }
+};
+
+class mqtt_client : private mosqpp::mosquittopp {
+public:
+ mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(),
+ on_message_(on_message_) {
+ cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl;
+ loop_start();
+ connect_async(mqtt_broker_host, mqtt_broker_port, 10);
+ }
+
+ ~mqtt_client() {
+ loop_stop(true);
+ disconnect();
+ }
+
+private:
+ std::function<void(const struct mosquitto_message *)> on_message_;
+ bool subscribed = false;
+
+ void on_connect(int rc) override {
+ cout << "Connected to MQTT broker" << endl;
+// should_run = false;
+ int qos = 0;
+ if (!subscribed) {
+ subscribed = true;
+ cout << "Subscribing..." << endl;
+ subscribe(nullptr, queue_name, qos);
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ subscribed = false;
+ cout << "Oops, disconnected" << endl;
+ }
+
+ void on_publish(int mid) override {
+ }
+
+ void on_message(const struct mosquitto_message *message) override {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ on_message_(message);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ cout << "Subscribed" << endl;
+ }
+
+ void on_unsubscribe(int mid) override {
+ cout << "Oops, unsubscribed" << endl;
+ }
+
+ void on_log(int level, const char *str) override {
+ cout << "MQTT: " << level << ":" << str << endl;
+ }
+
+ void on_error() override {
+ cout << "Oops, error" << endl;
+ }
+};
+
+void print_error(CassFuture *future) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(future, &message, &message_length);
+ string msg(message, message_length);
+ cout << "Cassandra error: " << msg << endl;
+}
+
+CassError execute_query(CassSession *session, const string &&query) {
+ CassError rc = CASS_OK;
+ CassFuture *future = NULL;
+ CassStatement *statement = cass_statement_new(query.c_str(), 0);
+
+ future = cass_session_execute(session, statement);
+ cass_future_wait(future);
+
+ rc = cass_future_error_code(future);
+ if (rc != CASS_OK) {
+ print_error(future);
+ }
+
+ cass_future_free(future);
+ cass_statement_free(statement);
+
+ return rc;
+}
+
+/*
+ CREATE TABLE sm_by_day (
+ device text,
+ day text,
+ timestamp timestamp,
+ sensors list<frozen<tuple<int, int>>>,
+ PRIMARY KEY ((device, day), timestamp)
+ )
+ */
+
+CassError insert_into_sm_by_day(CassSession *session, string payload) {
+ CassError rc = CASS_OK;
+ CassStatement *statement = nullptr;
+ CassFuture *future = nullptr;
+ auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);";
+
+ statement = cass_statement_new(query, 4);
+
+ auto device = "aa:bb:cc:dd:ee:ff";
+
+ std::time_t t = std::time(NULL);
+ char day[100];
+ std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t));
+ cass_statement_bind_string(statement, 0, device);
+ cass_statement_bind_string(statement, 1, day);
+
+ auto timestamp = std::time(NULL);
+ cass_statement_bind_int64(statement, 2, timestamp);
+
+ future = cass_session_execute(session, statement);
+ cass_future_wait(future);
+
+ rc = cass_future_error_code(future);
+ if (rc != CASS_OK) {
+ print_error(future);
+ }
+
+ cass_future_free(future);
+ cass_statement_free(statement);
+
+ return rc;
+}
+
+struct sensor_measurement {
+ int sensor;
+ int value;
+
+ sensor_measurement(int sensor, int value) : sensor(sensor), value(value) {
+ }
+
+ ~sensor_measurement() = default;
+};
+
+struct device_measurement {
+ string device;
+ long timestamp;
+ vector<sensor_measurement> sensors;
+
+ device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) :
+ device(device), timestamp(timestamp), sensors(std::move(sensors)) {
+ };
+
+ ~device_measurement() = default;
+
+ string str() {
+ stringstream buf;
+ buf << "device=" << device;
+ buf << ", timestamp=" << timestamp;
+ std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) {
+ buf << ", #" << sensor.sensor << "=" + sensor.value;
+ });
+ return buf.str();
+ }
+};
+
+template<typename Target, typename Source>
+boost::optional<Target> lexical_cast_optional(boost::optional<Source> &a) {
+ if (!a.is_initialized()) {
+ return boost::none;
+ }
+
+ try {
+ return boost::lexical_cast<Target>(a);
+ } catch (bad_lexical_cast &e) {
+ return boost::none;
+ }
+}
+
+void on_message(const struct mosquitto_message *message) {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ cout << "storing message: " << endl;
+ cout << payload << endl;
+ cout << "----------------------------------------" << endl;
+
+ KeyDictionary dict;
+ auto sample_buffer = make_shared<VectorSampleOutputStream>();
+// auto parser = open_sample_stream_parser(sample_buffer, dict);
+ auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict);
+
+ mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen);
+
+ input->process(buffer);
+ input->finish();
+
+ cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl;
+
+ auto device_key = dict.indexOf("device");
+ auto timestamp_key = dict.indexOf("timestamp");
+
+ std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
+ cout << "Sample: " << sample.to_string() << endl;
+
+ auto deviceO = sample.at(device_key);
+ auto timestampS = sample.at(timestamp_key);
+
+ if (!deviceO) {
+ cout << "Missing required key 'device'" << endl;
+ }
+
+ if (!timestampS) {
+ cout << "Missing required key 'timestamp'" << endl;
+ }
+
+ auto device = deviceO.get();
+
+ auto timestamp = lexical_cast_optional<long>(timestampS);
+ if (!timestamp) {
+ cout << "Invalid value for 'timestamp'" << endl;
+ }
+
+ vector<sensor_measurement> sensors;
+
+ for (int i = 0; i < 10; i++) {
+ auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i)));
+ auto valueS = sample.at(dict.indexOf("value" + to_string(i)));
+
+ auto sensor = lexical_cast_optional<int>(sensorS);
+ auto value = lexical_cast_optional<int>(valueS);
+
+ if (!sensor || !value) {
+ continue;
+ }
+
+ sensors.emplace_back(sensor.get(), value.get());
+ }
+
+ if (sensors.size() == 0) {
+ return;
+ }
+
+ device_measurement measurement(device, timestamp.get(), std::move(sensors));
+
+ cout << "Measurement: " << measurement.str() << endl;
+ });
+}
+
+int main() {
+ mqtt_lib mqtt_lib();
+
+ mqtt_client mqtt_client(on_message);
+ CassFuture *connect_future = nullptr;
+ CassCluster *cluster = cass_cluster_new();
+ CassSession *session = cass_session_new();
+
+ cass_cluster_set_contact_points(cluster, "127.0.0.1"); // , 127.0.0.2, 127.0.0.3
+
+ connect_future = cass_session_connect(session, cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = to_string(connect_future);
+ cerr << "Could not connect to Cassandra:" << s << endl;
+ }
+
+ cout << "Connected to Cassandra" << endl;
+
+ execute_query(session, "USE " + keyspace_name);
+
+ should_run = true;
+ while (should_run) {
+ cout << "sleeping.." << endl;
+ std::this_thread::sleep_for(10s);
+ }
+
+ return 0;
+}