aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
commit06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch)
treebc272faec6c585693341a814c65483e86c56255b
parentb632036b153297f83b10f6d960ccfe0c1772f00e (diff)
downloadmqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip
o Adding a MQTT consumer that stores the parsed record in Cassandra.
-rw-r--r--.gitmodules0
-rw-r--r--CMakeLists.txt28
-rw-r--r--README.md13
-rw-r--r--cassandra_support.h221
-rw-r--r--misc_support.h49
-rw-r--r--mqtt_support.h25
-rw-r--r--raw-mqtt-consumer.cpp239
-rw-r--r--sm-http-server.cpp2
-rw-r--r--sm-mqtt-consumer.cpp78
9 files changed, 565 insertions, 90 deletions
diff --git a/.gitmodules b/.gitmodules
deleted file mode 100644
index e69de29..0000000
--- a/.gitmodules
+++ /dev/null
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 75645c0..f63f023 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,6 +6,7 @@ find_package(Boost COMPONENTS system program_options unit_test_framework REQUIRE
include(ExternalProject)
set(SHARED_COMPILE_OPTIONS "-std=c++14")
+set(SHARED_SOURCES misc_support.h)
# Cassandra
set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver)
@@ -13,17 +14,17 @@ ExternalProject_Add(cpp-driver
URL https://github.com/datastax/cpp-driver/archive/2.1.0-beta.tar.gz
URL_MD5 d3cfde8731acc2f0f51ef9caf41068dc
PREFIX ${CPP_DRIVER}
- CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
+ CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> -DCMAKE_C_COMPILER:FILE=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER:FILE=${CMAKE_CXX_COMPILER}
)
# ble-toys
set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys)
ExternalProject_Add(ble-toys
GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git
- GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567
+ GIT_TAG f6493150c1a7172bcd8c9cc1790829285f707ee9
GIT_SUBMODULES json
PREFIX ${BLE_TOYS}
- CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
+ CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> -DCMAKE_C_COMPILER:FILE=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER:FILE=${CMAKE_CXX_COMPILER}
)
# nghttp2
@@ -45,7 +46,7 @@ ExternalProject_Add(nghttp2
# TODO: proper discovery
# sm-mqtt-consumer
-add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h)
+add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES})
add_dependencies(sm-mqtt-consumer cpp-driver)
## Boost
target_link_libraries(sm-mqtt-consumer PRIVATE ${Boost_LIBRARIES})
@@ -60,8 +61,24 @@ add_dependencies(sm-mqtt-consumer ble-toys)
target_include_directories(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/include)
target_link_libraries(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a)
+# raw-mqtt-consumer
+add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES})
+add_dependencies(raw-mqtt-consumer cpp-driver)
+## Boost
+target_link_libraries(raw-mqtt-consumer PRIVATE ${Boost_LIBRARIES})
+## Cassandra
+target_include_directories(raw-mqtt-consumer PRIVATE ${CPP_DRIVER}/include)
+target_link_libraries(raw-mqtt-consumer PRIVATE ${CPP_DRIVER}/lib/libcassandra.so)
+## Mosquitto
+target_compile_options(raw-mqtt-consumer PUBLIC ${SHARED_COMPILE_OPTIONS})
+target_link_libraries(raw-mqtt-consumer PRIVATE mosquitto mosquittopp)
+## Ble toys
+add_dependencies(raw-mqtt-consumer ble-toys)
+target_include_directories(raw-mqtt-consumer PRIVATE ${BLE_TOYS}/include)
+target_link_libraries(raw-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a)
+
# sm-http-server
-add_executable(sm-http-server sm-http-server.cpp cassandra_support.h http_support.h)
+add_executable(sm-http-server sm-http-server.cpp cassandra_support.h http_support.h ${SHARED_SOURCES})
target_compile_options(sm-http-server PUBLIC ${SHARED_COMPILE_OPTIONS})
## Boost
target_link_libraries(sm-http-server PRIVATE ${Boost_LIBRARIES})
@@ -76,6 +93,7 @@ target_link_libraries(sm-http-server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${
## Misc
target_link_libraries(sm-http-server PRIVATE ssl crypto pthread)
+# Testing
enable_testing()
add_executable(http-tests http-tests.cpp http_support.h)
target_compile_options(http-tests PUBLIC ${SHARED_COMPILE_OPTIONS})
diff --git a/README.md b/README.md
index 91681e4..5d39e63 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Schema
+# Soil Moisture Schema
CREATE TABLE sm_by_day (
device text,
@@ -11,6 +11,17 @@
)
);
+# Raw Schema
+
+ CREATE TABLE raw_record (
+ day text,
+ timestamp timestamp,
+ records list<frozen<tuple<text, text>>>,
+ PRIMARY KEY (
+ (day),
+ timestamp
+ )
+ );
# Create Schema
diff --git a/cassandra_support.h b/cassandra_support.h
index 7b50296..3f49be2 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -1,14 +1,18 @@
#ifndef TRYGVIS_CASSANDRA_SUPPORT_H
#define TRYGVIS_CASSANDRA_SUPPORT_H
-#include <stddef.h>
+#include <cassandra.h>
+
#include <string>
+#include <cassert>
#include <algorithm>
-#include <cassandra.h>
#include <stdexcept>
#include <functional>
#include <vector>
#include <iostream>
+#include <sstream>
+#include <iomanip>
+#include <mutex>
namespace trygvis {
namespace cassandra_support {
@@ -17,7 +21,7 @@ using namespace std;
class cassandra_error : public runtime_error {
public:
- cassandra_error(const string &context, string &&error) : runtime_error(
+ cassandra_error(const string &context, const string &error) : runtime_error(
"Cassandra error: context=" + context + ", error=" + error) {
}
@@ -26,14 +30,7 @@ public:
}
};
-string error_message(CassFuture *future) {
- const char *message;
- size_t message_length;
- cass_future_error_message(future, &message, &message_length);
- return string(message, message_length);
-}
-
-static CassError execute_query(CassSession *session, const string &&query) {
+static CassError execute_query(CassSession *session, const string query) {
CassStatement *statement = cass_statement_new(query.c_str(), 0);
CassFuture *future = cass_session_execute(session, statement);
@@ -82,7 +79,7 @@ protected:
cassandra_wrapper &operator=(const cassandra_wrapper &) = delete;
virtual ~cassandra_wrapper() {
- underlying_ = (Underlying *)0xaabbccdd;
+ underlying_ = reinterpret_cast<Underlying *>(0xaabbccdd);
}
Underlying *underlying_;
@@ -107,18 +104,155 @@ public:
cassandra_result &operator=(const cassandra_result &) = delete;
};
-static const CassResult *r;
+class cassandra_future2 {
+public:
+ // TODO: the values shouldn't be the future, but a specific result object instead
+ typedef void(callback_t)(cassandra_future2 &);
+
+ static
+ cassandra_future2 *wrap(CassFuture *future) {
+ return new cassandra_future2(future);
+ }
+
+private:
+ typedef std::lock_guard<std::mutex> guard;
+
+ callback_t *callback_ = nullptr;
+ CassFuture *future;
+ bool has_callback = false;
+ bool has_data = false;
+
+ std::mutex mutex;
+
+ cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) {
+ cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this <<
+ ", future=" << std::hex << std::setw(8) << future << endl;
+ cass_future_set_callback(future, callback, this);
+ }
+
+ static void default_callback(cassandra_future2 &) {
+ cout << "default_callback" << endl;
+ }
+
+ ~cassandra_future2() {
+ cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl;
+// cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl;
+// cass_future_free(underlying_);
+ }
+
+ cassandra_future2(const cassandra_future2 &) = delete;
+
+ cassandra_future2 &operator=(const cassandra_future2 &) = delete;
+
+public:
+ void then(callback_t callback_) {
+ bool do_delete = false;
+
+ {
+ guard lock(mutex);
+
+ assert(callback_ != nullptr);
+ this->callback_ = callback_;
+ has_callback = true;
+
+ if (has_data) {
+ cout << "Had early data" << endl;
+ this->callback(future);
+
+ cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
+ cass_future_free(future);
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
+
+ bool fetched = false;
+
+ cassandra_result result() {
+ if (fetched) {
+ throw cassandra_error("cassandra_result::result()", "Already fetched");
+ }
+ fetched = true;
+ const CassResult *x = cass_future_get_result(future);
+ size_t count = cass_result_row_count(x);
+ return std::move(cassandra_result(x));
+ }
+
+ static
+ string error_message(CassFuture * future) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(future, &message, &message_length);
+ return string(message, message_length);
+ }
+
+ string error_message() {
+ return error_message(future);
+ }
+
+ CassError error_code() const {
+ return cass_future_error_code(future);
+ }
-class cassandra_future;
+ bool operator!() const {
+ return !ok();
+ }
-typedef std::function<void(cassandra_future &)> callback_type;
+ operator bool() const {
+ return ok();
+ }
-struct tmp {
- callback_type cb;
+ bool ok() const {
+ return error_code() == CASS_OK;
+ }
+
+private:
+
+ static void callback(CassFuture *f, void *data) {
+ cassandra_future2 *c_f = static_cast<cassandra_future2 *>(data);
+ c_f->callback(f);
+ }
+
+ void callback(CassFuture *future) {
+ bool do_delete = false;
+
+ {
+ guard lock(mutex);
+
+ cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl;
+ cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl;
+ CassError rc = cass_future_error_code(future);
+
+ has_data = true;
+ if (has_callback) {
+ cout << "had callback already" << endl;
+ callback_(*this);
+
+ cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
+ cass_future_free(future);
+
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
};
class cassandra_future : public cassandra_wrapper<CassFuture> {
public:
+ typedef std::function<void(cassandra_future &)> callback_type;
+
+ struct tmp {
+ cassandra_future::callback_type cb;
+ };
+
cassandra_future(CassFuture *future) : cassandra_wrapper(future) {
}
@@ -140,11 +274,13 @@ public:
return std::move(cassandra_result(x));
}
+ static
+ string error_message(CassFuture *future) {
+ return cassandra_future2::error_message(future);
+ }
+
string error_message() {
- const char *message;
- size_t message_length;
- cass_future_error_message(underlying(), &message, &message_length);
- return string(message, message_length);
+ return error_message(underlying());
}
CassError error_code() const {
@@ -185,10 +321,14 @@ public:
cassandra_tuple &operator=(const cassandra_tuple &) = delete;
- void set(size_t i, cass_int32_t value) {
+ void set(size_t i, const cass_int32_t value) {
cass_tuple_set_int32(tuple, i, value);
}
+ void set(size_t i, const string &s) {
+ cass_tuple_set_string_n(tuple, i, s.c_str(), s.length());
+ }
+
CassTuple *tuple;
};
@@ -317,10 +457,19 @@ public:
cassandra_session &operator=(const cassandra_session &) = delete;
- void execute(cassandra_statement &&stmt, callback_type cb_) {
- auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0);
+ auto connect(CassCluster *cluster) {
+ return cass_session_connect(session, cluster);
+ }
+
+ [[deprecated]]
+ void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) {
+ auto future = cass_session_execute(session, stmt.underlying());
+ cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_});
+ }
+
+ cassandra_future2 *execute2(cassandra_statement &&stmt) {
auto future = cass_session_execute(session, stmt.underlying());
- cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_});
+ return cassandra_future2::wrap(future);
}
operator CassSession *() const {
@@ -335,6 +484,28 @@ private:
CassSession *session;
};
+class cassandra_logging {
+public:
+ cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) {
+ cass_log_set_level(log_level);
+ cass_log_set_callback(on_log, this);
+ }
+
+ ~cassandra_logging() {
+ }
+
+private:
+ static
+ void on_log(const CassLogMessage *message, void *data) {
+ stringstream buf;
+ buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " <<
+ message->file << ":" << message->function << ":" <<
+ message->line << ":" << message->message;
+
+ cout << "CASSANDRA: " << buf.str() << endl;
+ }
+};
+
}
}
diff --git a/misc_support.h b/misc_support.h
new file mode 100644
index 0000000..b00f6a8
--- /dev/null
+++ b/misc_support.h
@@ -0,0 +1,49 @@
+#ifndef SOIL_MOISTURE_MISC_SUPPORT_H
+#define SOIL_MOISTURE_MISC_SUPPORT_H
+
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options.hpp>
+#include <boost/optional.hpp>
+#include <string>
+
+namespace trygvis {
+namespace misc_support {
+
+using namespace boost;
+using namespace std;
+namespace po = boost::program_options;
+
+template<typename Target, typename Source>
+static
+boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) {
+ if (!a.is_initialized()) {
+ return boost::none;
+ }
+
+ return make_optional(f(a));
+}
+
+template<typename Target, typename Source>
+static
+boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) {
+ if (!a.is_initialized()) {
+ return boost::none;
+ }
+
+ return f(a.get());
+}
+
+template<typename Target, typename Source = string>
+static
+boost::optional<Target> l_c(const Source source) {
+ try {
+ return boost::lexical_cast<Target>(source);
+ } catch (bad_lexical_cast &e) {
+ return boost::none;
+ }
+};
+
+}
+}
+
+#endif
diff --git a/mqtt_support.h b/mqtt_support.h
new file mode 100644
index 0000000..9a9f7fe
--- /dev/null
+++ b/mqtt_support.h
@@ -0,0 +1,25 @@
+#ifndef TRYGVIS_MQTT_SUPPORT_H
+#define TRYGVIS_MQTT_SUPPORT_H
+
+#include "mosquittopp.h"
+
+namespace trygvis {
+namespace mqtt_support {
+
+using namespace mosqpp;
+
+class mqtt_lib {
+public:
+ mqtt_lib() {
+ mosquitto_lib_init();
+ }
+
+ ~mqtt_lib() {
+ mosquitto_lib_cleanup();
+ }
+};
+
+}
+}
+
+#endif
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
new file mode 100644
index 0000000..421d18c
--- /dev/null
+++ b/raw-mqtt-consumer.cpp
@@ -0,0 +1,239 @@
+#include "cassandra_support.h"
+#include "mqtt_support.h"
+#include <thread>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options.hpp>
+#include <trygvis/sensor.h>
+#include <trygvis/sensor/io.h>
+
+namespace raw_mqtt_consumer {
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::cassandra_support;
+using namespace trygvis::mqtt_support;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace boost;
+namespace po = boost::program_options;
+
+static bool should_run;
+static string mqtt_host;
+static int mqtt_port;
+static string mqtt_topic;
+static string keyspace_name = "soil_moisture";
+
+static unique_ptr<cassandra_session> current_cassandra_session;
+
+struct measurement {
+ KeyDictionary dict;
+};
+
+class raw_mqtt_client : private mosqpp::mosquittopp {
+public:
+ typedef std::function<void(const struct mosquitto_message *)> callback_t;
+
+ raw_mqtt_client(callback_t on_message_) : mosquittopp(),
+ on_message_(on_message_) {
+ cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
+ loop_start();
+ connect_async(mqtt_host.c_str(), mqtt_port, 10);
+ }
+
+ ~raw_mqtt_client() {
+ loop_stop(true);
+ disconnect();
+ }
+
+private:
+ callback_t on_message_;
+ bool subscribed = false;
+
+ void on_connect(int rc) override {
+ cout << "Connected to MQTT broker, rc=" << rc << endl;
+// should_run = false;
+ int qos = 0;
+ if (!subscribed) {
+ subscribed = true;
+ cout << "Subscribing..." << endl;
+ subscribe(nullptr, mqtt_topic.c_str(), qos);
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ subscribed = false;
+
+ cout << "Oops, disconnected, rc=" << rc << endl;
+ }
+
+ void on_publish(int mid) override {
+ }
+
+ void on_message(const struct mosquitto_message *message) override {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ on_message_(message);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ cout << "Subscribed" << endl;
+ }
+
+ void on_unsubscribe(int mid) override {
+ cout << "Oops, unsubscribed" << endl;
+ }
+
+ void on_log(int level, const char *str) override {
+ cout << "MQTT: " << level << ":" << str << endl;
+ }
+
+ void on_error() override {
+ cout << "Oops, error" << endl;
+ }
+};
+
+cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) {
+ cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3);
+
+ auto system_now = system_clock::now();
+
+ char day[100];
+ std::time_t t = system_clock::to_time_t(system_now);
+ std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
+ q.bind(0, day);
+ cout << "day=" << day << endl;
+
+ auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now);
+ long timestamp = now_ms.time_since_epoch().count();
+ q.bind(1, timestamp);
+ cout << "timestamp=" << timestamp << endl;
+
+ auto buf = make_shared<stringstream>();
+ auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE);
+ output->write(record);
+
+ cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size());
+ for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) {
+ cassandra_tuple tuple(2);
+ tuple.set(0, key->name);
+ o<string> value = record.at(key);
+ if (value) {
+ tuple.set(1, value.get());
+ }
+ c.append(std::move(tuple));
+ });
+
+ q.bind(2, c);
+
+ return session->execute2(std::move(q));
+}
+
+void on_message(const struct mosquitto_message *message) {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ cout << "storing message: " << endl;
+ cout << payload << endl;
+ cout << "----------------------------------------" << endl;
+
+ KeyDictionary dict;
+ auto sample_buffer = make_shared<VectorSampleOutputStream>();
+ auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict);
+
+ mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen);
+
+ input->process(buffer);
+ input->finish();
+
+ cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl;
+
+ if (current_cassandra_session) {
+ std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
+ cout << "Sample: " << sample.to_string() << endl;
+
+ insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) {
+ if (f)
+ cout << "Success!" << endl;
+ else {
+ cout << "Error: " << f.error_message() << endl;
+ }
+ });
+
+ cout << "sample insert scheduled" << endl;
+ });
+ } else {
+ cout << "Not connected to Cassandra" << endl;
+ }
+}
+
+int main(int argc, const char **argv) {
+ mqtt_lib mqtt_lib;
+ cassandra_logging cassandra_logging;
+
+ string cassandra_cluster;
+ po::options_description all("Options");
+ all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
+ all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
+ all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
+ all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required());
+
+ po::variables_map vm;
+ try {
+ auto parsed = po::parse_command_line(argc, argv, all);
+ po::store(parsed, vm);
+ po::notify(vm);
+ auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
+
+ if (vm.count("help")) {
+ cerr << all << "\n";
+ return EXIT_FAILURE;
+ }
+
+ if (unrecognized.size()) {
+ cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ return EXIT_FAILURE;
+ }
+
+ } catch (po::required_option &e) {
+ cerr << "Missing required option: " << e.get_option_name() << endl;
+ cerr << all << endl;
+ } catch (po::unknown_option &e) {
+ cerr << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+
+ CassCluster *cluster = cass_cluster_new();
+ auto session = make_unique<cassandra_session>();
+
+ cout << "Connecting to Cassandra at " << cassandra_cluster << endl;
+ cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+
+ auto connect_future = session->connect(cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = cassandra_future::error_message(connect_future);
+ cerr << "Could not connect to Cassandra: " << s << endl;
+ return EXIT_FAILURE;
+ }
+
+ execute_query(session->underlying(), "USE " + keyspace_name);
+
+ cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
+
+ raw_mqtt_client mqtt_client(on_message);
+
+ should_run = true;
+
+ while (should_run) {
+ cout << "sleeping.." << endl;
+ std::this_thread::sleep_for(60s);
+ }
+
+ current_cassandra_session.release();
+
+ return 0;
+}
+
+}
+
+int main(int argc, const char **argv) {
+ return raw_mqtt_consumer::main(argc, argv);
+}
diff --git a/sm-http-server.cpp b/sm-http-server.cpp
index b1cd37e..6b3169d 100644
--- a/sm-http-server.cpp
+++ b/sm-http-server.cpp
@@ -185,7 +185,7 @@ int main(int argc, const char *const argv[]) {
auto connect_future = cass_session_connect(current_cassandra_session->underlying(), cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
- string s = error_message(connect_future);
+ string s = cassandra_future::error_message(connect_future);
cerr << "Could not connect to Cassandra: " << s << endl;
return EXIT_FAILURE;
}
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index e1f6801..b91b663 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -1,15 +1,19 @@
#include "cassandra_support.h"
-#include "mosquittopp.h"
-#include "trygvis/sensor/io.h"
+#include "mqtt_support.h"
+#include "misc_support.h"
#include <thread>
-#include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp>
+#include <trygvis/sensor/io.h>
+
+namespace sm_mqtt_consumer {
using namespace std;
using namespace std::chrono;
using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
using namespace trygvis::cassandra_support;
+using namespace trygvis::mqtt_support;
+using namespace trygvis::misc_support;
using namespace boost;
namespace po = boost::program_options;
@@ -53,29 +57,12 @@ struct device_measurement {
}
};
-string &&to_string(CassFuture *f) {
- const char *message;
- size_t message_length;
- cass_future_error_message(f, &message, &message_length);
-
- return std::move(string(message, message_length));
-}
-
-class mqtt_lib {
-public:
- mqtt_lib() {
- mosquitto_lib_init();
- }
-
- ~mqtt_lib() {
- mosquitto_lib_cleanup();
- }
-};
-
class mqtt_client : private mosqpp::mosquittopp {
public:
- mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(),
- on_message_(on_message_) {
+ typedef std::function<void(const struct mosquitto_message *)> callback_t;
+
+ mqtt_client(callback_t on_message_) : mosquittopp(),
+ on_message_(on_message_) {
cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl;
loop_start();
connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10);
@@ -87,7 +74,7 @@ public:
}
private:
- std::function<void(const struct mosquitto_message *)> on_message_;
+ callback_t on_message_;
bool subscribed = false;
void on_connect(int rc) override {
@@ -132,10 +119,6 @@ private:
}
};
-void print_error(CassFuture *future) {
- cout << "Cassandra error: " << error_message(future) << endl;
-}
-
auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
@@ -161,33 +144,6 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen
return cass_session_execute(session, q);
}
-template<typename Target, typename Source>
-boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) {
- if (!a.is_initialized()) {
- return boost::none;
- }
-
- return make_optional(f(a));
-}
-
-template<typename Target, typename Source>
-boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) {
- if (!a.is_initialized()) {
- return boost::none;
- }
-
- return f(a.get());
-}
-
-template<typename Target, typename Source = string>
-boost::optional<Target> l_c(const Source source) {
- try {
- return boost::lexical_cast<Target>(source);
- } catch (bad_lexical_cast &e) {
- return boost::none;
- }
-};
-
void on_message(const struct mosquitto_message *message) {
string payload((const char *) message->payload, (size_t) message->payloadlen);
cout << "storing message: " << endl;
@@ -255,7 +211,7 @@ void on_message(const struct mosquitto_message *message) {
handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) {
cout << "Success!" << endl;
}, [&](auto future, auto err) {
- cout << "Failure: " << error_message(future) << endl;
+ cout << "Failure: " << cassandra_future::error_message(future) << endl;
});
} else {
cout << "Not connected to Cassandra" << endl;
@@ -306,7 +262,7 @@ int main(int argc, const char **argv) {
connect_future = cass_session_connect((CassSession*) session.get(), cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
- string s = to_string(connect_future);
+ string s = cassandra_future::error_message(connect_future);
cerr << "Could not connect to Cassandra:" << s << endl;
return EXIT_FAILURE;
}
@@ -326,3 +282,9 @@ int main(int argc, const char **argv) {
return 0;
}
+
+}
+
+int main(int argc, const char **argv) {
+ return sm_mqtt_consumer::main(argc, argv);
+}