diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-12 14:44:31 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-12 14:44:31 +0200 |
commit | 1dcff1b442e9d99feb473e282d22b08b9197bb3b (patch) | |
tree | b61a46cb149aa1304990229c07b4db0f854456cb | |
download | mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.gz mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.bz2 mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.tar.xz mqtt-cassandra-bridge-1dcff1b442e9d99feb473e282d22b08b9197bb3b.zip |
o Initial import.
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | .gitmodules | 0 | ||||
-rw-r--r-- | CMakeLists.txt | 36 | ||||
-rw-r--r-- | README.md | 16 | ||||
-rw-r--r-- | main.cpp | 311 |
5 files changed, 366 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2291abb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +*.iml +build diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/.gitmodules diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..878abf9 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,36 @@ +cmake_minimum_required(VERSION 3.2) +project(mqtt_cassandra_bridge) + +include(ExternalProject) + +add_executable(mqtt_cassandra_bridge main.cpp) +target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14") + +# Cassandra +set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver) +ExternalProject_Add(cpp-driver + URL https://github.com/datastax/cpp-driver/archive/2.1.0-beta.tar.gz + URL_MD5 d3cfde8731acc2f0f51ef9caf41068dc + PREFIX ${CPP_DRIVER} + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> +) +target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +add_dependencies(mqtt_cassandra_bridge cpp-driver) + +# ble-toys +set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys) +ExternalProject_Add(ble-toys + GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git + GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567 + GIT_SUBMODULES json + BUILD_ALWAYS 0 + PREFIX ${BLE_TOYS} + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> +) +add_dependencies(mqtt_cassandra_bridge ble-toys) +target_include_directories(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/include) +target_link_libraries(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) + +# Mosquitto +target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp) diff --git a/README.md b/README.md new file mode 100644 index 0000000..6ce8d37 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +# Create Schema + + $ bin/cqlsh + cqlsh> create keyspace soil_moisture WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + cqlsh> use soil_moisture; + cqlsh:soil_moisture> + create table sm_by_day( + device text, + day text, + timestamp timestamp, + sensors list<frozen<tuple<int, int>>>, + primary key( + (device, day), + timestamp + ) + ); diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..806c047 --- /dev/null +++ b/main.cpp @@ -0,0 +1,311 @@ +#include "mosquittopp.h" +#include "cassandra.h" +#include "trygvis/sensor/io.h" +#include <iostream> +#include <chrono> +#include <thread> +#include <vector> +#include <boost/lexical_cast.hpp> + +using namespace std; +using namespace std::chrono; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace boost; + +static bool should_run; +static auto mqtt_broker_host = "test.mosquitto.org"; +static auto mqtt_broker_port = 1883; +static auto queue_name = "/trygvis"; +static string keyspace_name = "soil_moisture"; + +string &&to_string(CassFuture *f) { + const char *message; + size_t message_length; + cass_future_error_message(f, &message, &message_length); + + return std::move(string(message, message_length)); +} + +class mqtt_lib { +public: + mqtt_lib() { + mosquitto_lib_init(); + } + + ~mqtt_lib() { + mosquitto_lib_cleanup(); + } +}; + +class mqtt_client : private mosqpp::mosquittopp { +public: + mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; + loop_start(); + connect_async(mqtt_broker_host, mqtt_broker_port, 10); + } + + ~mqtt_client() { + loop_stop(true); + disconnect(); + } + +private: + std::function<void(const struct mosquitto_message *)> on_message_; + bool subscribed = false; + + void on_connect(int rc) override { + cout << "Connected to MQTT broker" << endl; +// should_run = false; + int qos = 0; + if (!subscribed) { + subscribed = true; + cout << "Subscribing..." << endl; + subscribe(nullptr, queue_name, qos); + } + } + + void on_disconnect(int rc) override { + subscribed = false; + cout << "Oops, disconnected" << endl; + } + + void on_publish(int mid) override { + } + + void on_message(const struct mosquitto_message *message) override { + string payload((const char *) message->payload, (size_t) message->payloadlen); + on_message_(message); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + cout << "Subscribed" << endl; + } + + void on_unsubscribe(int mid) override { + cout << "Oops, unsubscribed" << endl; + } + + void on_log(int level, const char *str) override { + cout << "MQTT: " << level << ":" << str << endl; + } + + void on_error() override { + cout << "Oops, error" << endl; + } +}; + +void print_error(CassFuture *future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + string msg(message, message_length); + cout << "Cassandra error: " << msg << endl; +} + +CassError execute_query(CassSession *session, const string &&query) { + CassError rc = CASS_OK; + CassFuture *future = NULL; + CassStatement *statement = cass_statement_new(query.c_str(), 0); + + future = cass_session_execute(session, statement); + cass_future_wait(future); + + rc = cass_future_error_code(future); + if (rc != CASS_OK) { + print_error(future); + } + + cass_future_free(future); + cass_statement_free(statement); + + return rc; +} + +/* + CREATE TABLE sm_by_day ( + device text, + day text, + timestamp timestamp, + sensors list<frozen<tuple<int, int>>>, + PRIMARY KEY ((device, day), timestamp) + ) + */ + +CassError insert_into_sm_by_day(CassSession *session, string payload) { + CassError rc = CASS_OK; + CassStatement *statement = nullptr; + CassFuture *future = nullptr; + auto query = "INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);"; + + statement = cass_statement_new(query, 4); + + auto device = "aa:bb:cc:dd:ee:ff"; + + std::time_t t = std::time(NULL); + char day[100]; + std::strftime(day, sizeof(day), "%Y-%M-%D", std::localtime(&t)); + cass_statement_bind_string(statement, 0, device); + cass_statement_bind_string(statement, 1, day); + + auto timestamp = std::time(NULL); + cass_statement_bind_int64(statement, 2, timestamp); + + future = cass_session_execute(session, statement); + cass_future_wait(future); + + rc = cass_future_error_code(future); + if (rc != CASS_OK) { + print_error(future); + } + + cass_future_free(future); + cass_statement_free(statement); + + return rc; +} + +struct sensor_measurement { + int sensor; + int value; + + sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { + } + + ~sensor_measurement() = default; +}; + +struct device_measurement { + string device; + long timestamp; + vector<sensor_measurement> sensors; + + device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) : + device(device), timestamp(timestamp), sensors(std::move(sensors)) { + }; + + ~device_measurement() = default; + + string str() { + stringstream buf; + buf << "device=" << device; + buf << ", timestamp=" << timestamp; + std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { + buf << ", #" << sensor.sensor << "=" + sensor.value; + }); + return buf.str(); + } +}; + +template<typename Target, typename Source> +boost::optional<Target> lexical_cast_optional(boost::optional<Source> &a) { + if (!a.is_initialized()) { + return boost::none; + } + + try { + return boost::lexical_cast<Target>(a); + } catch (bad_lexical_cast &e) { + return boost::none; + } +} + +void on_message(const struct mosquitto_message *message) { + string payload((const char *) message->payload, (size_t) message->payloadlen); + cout << "storing message: " << endl; + cout << payload << endl; + cout << "----------------------------------------" << endl; + + KeyDictionary dict; + auto sample_buffer = make_shared<VectorSampleOutputStream>(); +// auto parser = open_sample_stream_parser(sample_buffer, dict); + auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict); + + mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); + + input->process(buffer); + input->finish(); + + cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; + + auto device_key = dict.indexOf("device"); + auto timestamp_key = dict.indexOf("timestamp"); + + std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { + cout << "Sample: " << sample.to_string() << endl; + + auto deviceO = sample.at(device_key); + auto timestampS = sample.at(timestamp_key); + + if (!deviceO) { + cout << "Missing required key 'device'" << endl; + } + + if (!timestampS) { + cout << "Missing required key 'timestamp'" << endl; + } + + auto device = deviceO.get(); + + auto timestamp = lexical_cast_optional<long>(timestampS); + if (!timestamp) { + cout << "Invalid value for 'timestamp'" << endl; + } + + vector<sensor_measurement> sensors; + + for (int i = 0; i < 10; i++) { + auto sensorS = sample.at(dict.indexOf("sensor" + to_string(i))); + auto valueS = sample.at(dict.indexOf("value" + to_string(i))); + + auto sensor = lexical_cast_optional<int>(sensorS); + auto value = lexical_cast_optional<int>(valueS); + + if (!sensor || !value) { + continue; + } + + sensors.emplace_back(sensor.get(), value.get()); + } + + if (sensors.size() == 0) { + return; + } + + device_measurement measurement(device, timestamp.get(), std::move(sensors)); + + cout << "Measurement: " << measurement.str() << endl; + }); +} + +int main() { + mqtt_lib mqtt_lib(); + + mqtt_client mqtt_client(on_message); + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + CassSession *session = cass_session_new(); + + cass_cluster_set_contact_points(cluster, "127.0.0.1"); // , 127.0.0.2, 127.0.0.3 + + connect_future = cass_session_connect(session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = to_string(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + } + + cout << "Connected to Cassandra" << endl; + + execute_query(session, "USE " + keyspace_name); + + should_run = true; + while (should_run) { + cout << "sleeping.." << endl; + std::this_thread::sleep_for(10s); + } + + return 0; +} |