diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 16:27:10 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 16:27:10 +0200 |
commit | 06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch) | |
tree | bc272faec6c585693341a814c65483e86c56255b | |
parent | b632036b153297f83b10f6d960ccfe0c1772f00e (diff) | |
download | mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2 mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip |
o Adding a MQTT consumer that stores the parsed record in Cassandra.
-rw-r--r-- | .gitmodules | 0 | ||||
-rw-r--r-- | CMakeLists.txt | 28 | ||||
-rw-r--r-- | README.md | 13 | ||||
-rw-r--r-- | cassandra_support.h | 221 | ||||
-rw-r--r-- | misc_support.h | 49 | ||||
-rw-r--r-- | mqtt_support.h | 25 | ||||
-rw-r--r-- | raw-mqtt-consumer.cpp | 239 | ||||
-rw-r--r-- | sm-http-server.cpp | 2 | ||||
-rw-r--r-- | sm-mqtt-consumer.cpp | 78 |
9 files changed, 565 insertions, 90 deletions
diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index e69de29..0000000 --- a/.gitmodules +++ /dev/null diff --git a/CMakeLists.txt b/CMakeLists.txt index 75645c0..f63f023 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,7 @@ find_package(Boost COMPONENTS system program_options unit_test_framework REQUIRE include(ExternalProject) set(SHARED_COMPILE_OPTIONS "-std=c++14") +set(SHARED_SOURCES misc_support.h) # Cassandra set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver) @@ -13,17 +14,17 @@ ExternalProject_Add(cpp-driver URL https://github.com/datastax/cpp-driver/archive/2.1.0-beta.tar.gz URL_MD5 d3cfde8731acc2f0f51ef9caf41068dc PREFIX ${CPP_DRIVER} - CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> -DCMAKE_C_COMPILER:FILE=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER:FILE=${CMAKE_CXX_COMPILER} ) # ble-toys set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys) ExternalProject_Add(ble-toys GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git - GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567 + GIT_TAG f6493150c1a7172bcd8c9cc1790829285f707ee9 GIT_SUBMODULES json PREFIX ${BLE_TOYS} - CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> -DCMAKE_C_COMPILER:FILE=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER:FILE=${CMAKE_CXX_COMPILER} ) # nghttp2 @@ -45,7 +46,7 @@ ExternalProject_Add(nghttp2 # TODO: proper discovery # sm-mqtt-consumer -add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h) +add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES}) add_dependencies(sm-mqtt-consumer cpp-driver) ## Boost target_link_libraries(sm-mqtt-consumer PRIVATE ${Boost_LIBRARIES}) @@ -60,8 +61,24 @@ add_dependencies(sm-mqtt-consumer ble-toys) target_include_directories(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/include) target_link_libraries(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) +# raw-mqtt-consumer +add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES}) +add_dependencies(raw-mqtt-consumer cpp-driver) +## Boost +target_link_libraries(raw-mqtt-consumer PRIVATE ${Boost_LIBRARIES}) +## Cassandra +target_include_directories(raw-mqtt-consumer PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(raw-mqtt-consumer PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +## Mosquitto +target_compile_options(raw-mqtt-consumer PUBLIC ${SHARED_COMPILE_OPTIONS}) +target_link_libraries(raw-mqtt-consumer PRIVATE mosquitto mosquittopp) +## Ble toys +add_dependencies(raw-mqtt-consumer ble-toys) +target_include_directories(raw-mqtt-consumer PRIVATE ${BLE_TOYS}/include) +target_link_libraries(raw-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) + # sm-http-server -add_executable(sm-http-server sm-http-server.cpp cassandra_support.h http_support.h) +add_executable(sm-http-server sm-http-server.cpp cassandra_support.h http_support.h ${SHARED_SOURCES}) target_compile_options(sm-http-server PUBLIC ${SHARED_COMPILE_OPTIONS}) ## Boost target_link_libraries(sm-http-server PRIVATE ${Boost_LIBRARIES}) @@ -76,6 +93,7 @@ target_link_libraries(sm-http-server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${ ## Misc target_link_libraries(sm-http-server PRIVATE ssl crypto pthread) +# Testing enable_testing() add_executable(http-tests http-tests.cpp http_support.h) target_compile_options(http-tests PUBLIC ${SHARED_COMPILE_OPTIONS}) @@ -1,4 +1,4 @@ -# Schema +# Soil Moisture Schema CREATE TABLE sm_by_day ( device text, @@ -11,6 +11,17 @@ ) ); +# Raw Schema + + CREATE TABLE raw_record ( + day text, + timestamp timestamp, + records list<frozen<tuple<text, text>>>, + PRIMARY KEY ( + (day), + timestamp + ) + ); # Create Schema diff --git a/cassandra_support.h b/cassandra_support.h index 7b50296..3f49be2 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -1,14 +1,18 @@ #ifndef TRYGVIS_CASSANDRA_SUPPORT_H #define TRYGVIS_CASSANDRA_SUPPORT_H -#include <stddef.h> +#include <cassandra.h> + #include <string> +#include <cassert> #include <algorithm> -#include <cassandra.h> #include <stdexcept> #include <functional> #include <vector> #include <iostream> +#include <sstream> +#include <iomanip> +#include <mutex> namespace trygvis { namespace cassandra_support { @@ -17,7 +21,7 @@ using namespace std; class cassandra_error : public runtime_error { public: - cassandra_error(const string &context, string &&error) : runtime_error( + cassandra_error(const string &context, const string &error) : runtime_error( "Cassandra error: context=" + context + ", error=" + error) { } @@ -26,14 +30,7 @@ public: } }; -string error_message(CassFuture *future) { - const char *message; - size_t message_length; - cass_future_error_message(future, &message, &message_length); - return string(message, message_length); -} - -static CassError execute_query(CassSession *session, const string &&query) { +static CassError execute_query(CassSession *session, const string query) { CassStatement *statement = cass_statement_new(query.c_str(), 0); CassFuture *future = cass_session_execute(session, statement); @@ -82,7 +79,7 @@ protected: cassandra_wrapper &operator=(const cassandra_wrapper &) = delete; virtual ~cassandra_wrapper() { - underlying_ = (Underlying *)0xaabbccdd; + underlying_ = reinterpret_cast<Underlying *>(0xaabbccdd); } Underlying *underlying_; @@ -107,18 +104,155 @@ public: cassandra_result &operator=(const cassandra_result &) = delete; }; -static const CassResult *r; +class cassandra_future2 { +public: + // TODO: the values shouldn't be the future, but a specific result object instead + typedef void(callback_t)(cassandra_future2 &); + + static + cassandra_future2 *wrap(CassFuture *future) { + return new cassandra_future2(future); + } + +private: + typedef std::lock_guard<std::mutex> guard; + + callback_t *callback_ = nullptr; + CassFuture *future; + bool has_callback = false; + bool has_data = false; + + std::mutex mutex; + + cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) { + cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this << + ", future=" << std::hex << std::setw(8) << future << endl; + cass_future_set_callback(future, callback, this); + } + + static void default_callback(cassandra_future2 &) { + cout << "default_callback" << endl; + } + + ~cassandra_future2() { + cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl; +// cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl; +// cass_future_free(underlying_); + } + + cassandra_future2(const cassandra_future2 &) = delete; + + cassandra_future2 &operator=(const cassandra_future2 &) = delete; + +public: + void then(callback_t callback_) { + bool do_delete = false; + + { + guard lock(mutex); + + assert(callback_ != nullptr); + this->callback_ = callback_; + has_callback = true; + + if (has_data) { + cout << "Had early data" << endl; + this->callback(future); + + cout << "freeing future: " << std::hex << std::setw(8) << future << endl; + cass_future_free(future); + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } + + bool fetched = false; + + cassandra_result result() { + if (fetched) { + throw cassandra_error("cassandra_result::result()", "Already fetched"); + } + fetched = true; + const CassResult *x = cass_future_get_result(future); + size_t count = cass_result_row_count(x); + return std::move(cassandra_result(x)); + } + + static + string error_message(CassFuture * future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + return string(message, message_length); + } + + string error_message() { + return error_message(future); + } + + CassError error_code() const { + return cass_future_error_code(future); + } -class cassandra_future; + bool operator!() const { + return !ok(); + } -typedef std::function<void(cassandra_future &)> callback_type; + operator bool() const { + return ok(); + } -struct tmp { - callback_type cb; + bool ok() const { + return error_code() == CASS_OK; + } + +private: + + static void callback(CassFuture *f, void *data) { + cassandra_future2 *c_f = static_cast<cassandra_future2 *>(data); + c_f->callback(f); + } + + void callback(CassFuture *future) { + bool do_delete = false; + + { + guard lock(mutex); + + cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl; + cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl; + CassError rc = cass_future_error_code(future); + + has_data = true; + if (has_callback) { + cout << "had callback already" << endl; + callback_(*this); + + cout << "freeing future: " << std::hex << std::setw(8) << future << endl; + cass_future_free(future); + + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } }; class cassandra_future : public cassandra_wrapper<CassFuture> { public: + typedef std::function<void(cassandra_future &)> callback_type; + + struct tmp { + cassandra_future::callback_type cb; + }; + cassandra_future(CassFuture *future) : cassandra_wrapper(future) { } @@ -140,11 +274,13 @@ public: return std::move(cassandra_result(x)); } + static + string error_message(CassFuture *future) { + return cassandra_future2::error_message(future); + } + string error_message() { - const char *message; - size_t message_length; - cass_future_error_message(underlying(), &message, &message_length); - return string(message, message_length); + return error_message(underlying()); } CassError error_code() const { @@ -185,10 +321,14 @@ public: cassandra_tuple &operator=(const cassandra_tuple &) = delete; - void set(size_t i, cass_int32_t value) { + void set(size_t i, const cass_int32_t value) { cass_tuple_set_int32(tuple, i, value); } + void set(size_t i, const string &s) { + cass_tuple_set_string_n(tuple, i, s.c_str(), s.length()); + } + CassTuple *tuple; }; @@ -317,10 +457,19 @@ public: cassandra_session &operator=(const cassandra_session &) = delete; - void execute(cassandra_statement &&stmt, callback_type cb_) { - auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0); + auto connect(CassCluster *cluster) { + return cass_session_connect(session, cluster); + } + + [[deprecated]] + void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) { + auto future = cass_session_execute(session, stmt.underlying()); + cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_}); + } + + cassandra_future2 *execute2(cassandra_statement &&stmt) { auto future = cass_session_execute(session, stmt.underlying()); - cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_}); + return cassandra_future2::wrap(future); } operator CassSession *() const { @@ -335,6 +484,28 @@ private: CassSession *session; }; +class cassandra_logging { +public: + cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) { + cass_log_set_level(log_level); + cass_log_set_callback(on_log, this); + } + + ~cassandra_logging() { + } + +private: + static + void on_log(const CassLogMessage *message, void *data) { + stringstream buf; + buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " << + message->file << ":" << message->function << ":" << + message->line << ":" << message->message; + + cout << "CASSANDRA: " << buf.str() << endl; + } +}; + } } diff --git a/misc_support.h b/misc_support.h new file mode 100644 index 0000000..b00f6a8 --- /dev/null +++ b/misc_support.h @@ -0,0 +1,49 @@ +#ifndef SOIL_MOISTURE_MISC_SUPPORT_H +#define SOIL_MOISTURE_MISC_SUPPORT_H + +#include <boost/lexical_cast.hpp> +#include <boost/program_options.hpp> +#include <boost/optional.hpp> +#include <string> + +namespace trygvis { +namespace misc_support { + +using namespace boost; +using namespace std; +namespace po = boost::program_options; + +template<typename Target, typename Source> +static +boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) { + if (!a.is_initialized()) { + return boost::none; + } + + return make_optional(f(a)); +} + +template<typename Target, typename Source> +static +boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) { + if (!a.is_initialized()) { + return boost::none; + } + + return f(a.get()); +} + +template<typename Target, typename Source = string> +static +boost::optional<Target> l_c(const Source source) { + try { + return boost::lexical_cast<Target>(source); + } catch (bad_lexical_cast &e) { + return boost::none; + } +}; + +} +} + +#endif diff --git a/mqtt_support.h b/mqtt_support.h new file mode 100644 index 0000000..9a9f7fe --- /dev/null +++ b/mqtt_support.h @@ -0,0 +1,25 @@ +#ifndef TRYGVIS_MQTT_SUPPORT_H +#define TRYGVIS_MQTT_SUPPORT_H + +#include "mosquittopp.h" + +namespace trygvis { +namespace mqtt_support { + +using namespace mosqpp; + +class mqtt_lib { +public: + mqtt_lib() { + mosquitto_lib_init(); + } + + ~mqtt_lib() { + mosquitto_lib_cleanup(); + } +}; + +} +} + +#endif diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp new file mode 100644 index 0000000..421d18c --- /dev/null +++ b/raw-mqtt-consumer.cpp @@ -0,0 +1,239 @@ +#include "cassandra_support.h" +#include "mqtt_support.h" +#include <thread> +#include <boost/lexical_cast.hpp> +#include <boost/program_options.hpp> +#include <trygvis/sensor.h> +#include <trygvis/sensor/io.h> + +namespace raw_mqtt_consumer { + +using namespace std; +using namespace std::chrono; +using namespace trygvis::cassandra_support; +using namespace trygvis::mqtt_support; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace boost; +namespace po = boost::program_options; + +static bool should_run; +static string mqtt_host; +static int mqtt_port; +static string mqtt_topic; +static string keyspace_name = "soil_moisture"; + +static unique_ptr<cassandra_session> current_cassandra_session; + +struct measurement { + KeyDictionary dict; +}; + +class raw_mqtt_client : private mosqpp::mosquittopp { +public: + typedef std::function<void(const struct mosquitto_message *)> callback_t; + + raw_mqtt_client(callback_t on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; + loop_start(); + connect_async(mqtt_host.c_str(), mqtt_port, 10); + } + + ~raw_mqtt_client() { + loop_stop(true); + disconnect(); + } + +private: + callback_t on_message_; + bool subscribed = false; + + void on_connect(int rc) override { + cout << "Connected to MQTT broker, rc=" << rc << endl; +// should_run = false; + int qos = 0; + if (!subscribed) { + subscribed = true; + cout << "Subscribing..." << endl; + subscribe(nullptr, mqtt_topic.c_str(), qos); + } + } + + void on_disconnect(int rc) override { + subscribed = false; + + cout << "Oops, disconnected, rc=" << rc << endl; + } + + void on_publish(int mid) override { + } + + void on_message(const struct mosquitto_message *message) override { + string payload((const char *) message->payload, (size_t) message->payloadlen); + on_message_(message); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + cout << "Subscribed" << endl; + } + + void on_unsubscribe(int mid) override { + cout << "Oops, unsubscribed" << endl; + } + + void on_log(int level, const char *str) override { + cout << "MQTT: " << level << ":" << str << endl; + } + + void on_error() override { + cout << "Oops, error" << endl; + } +}; + +cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) { + cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3); + + auto system_now = system_clock::now(); + + char day[100]; + std::time_t t = system_clock::to_time_t(system_now); + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(0, day); + cout << "day=" << day << endl; + + auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now); + long timestamp = now_ms.time_since_epoch().count(); + q.bind(1, timestamp); + cout << "timestamp=" << timestamp << endl; + + auto buf = make_shared<stringstream>(); + auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE); + output->write(record); + + cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size()); + for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) { + cassandra_tuple tuple(2); + tuple.set(0, key->name); + o<string> value = record.at(key); + if (value) { + tuple.set(1, value.get()); + } + c.append(std::move(tuple)); + }); + + q.bind(2, c); + + return session->execute2(std::move(q)); +} + +void on_message(const struct mosquitto_message *message) { + string payload((const char *) message->payload, (size_t) message->payloadlen); + cout << "storing message: " << endl; + cout << payload << endl; + cout << "----------------------------------------" << endl; + + KeyDictionary dict; + auto sample_buffer = make_shared<VectorSampleOutputStream>(); + auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict); + + mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); + + input->process(buffer); + input->finish(); + + cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; + + if (current_cassandra_session) { + std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { + cout << "Sample: " << sample.to_string() << endl; + + insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) { + if (f) + cout << "Success!" << endl; + else { + cout << "Error: " << f.error_message() << endl; + } + }); + + cout << "sample insert scheduled" << endl; + }); + } else { + cout << "Not connected to Cassandra" << endl; + } +} + +int main(int argc, const char **argv) { + mqtt_lib mqtt_lib; + cassandra_logging cassandra_logging; + + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); + all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); + all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); + all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required()); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique<cassandra_session>(); + + cout << "Connecting to Cassandra at " << cassandra_cluster << endl; + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + auto connect_future = session->connect(cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = cassandra_future::error_message(connect_future); + cerr << "Could not connect to Cassandra: " << s << endl; + return EXIT_FAILURE; + } + + execute_query(session->underlying(), "USE " + keyspace_name); + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + raw_mqtt_client mqtt_client(on_message); + + should_run = true; + + while (should_run) { + cout << "sleeping.." << endl; + std::this_thread::sleep_for(60s); + } + + current_cassandra_session.release(); + + return 0; +} + +} + +int main(int argc, const char **argv) { + return raw_mqtt_consumer::main(argc, argv); +} diff --git a/sm-http-server.cpp b/sm-http-server.cpp index b1cd37e..6b3169d 100644 --- a/sm-http-server.cpp +++ b/sm-http-server.cpp @@ -185,7 +185,7 @@ int main(int argc, const char *const argv[]) { auto connect_future = cass_session_connect(current_cassandra_session->underlying(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { - string s = error_message(connect_future); + string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra: " << s << endl; return EXIT_FAILURE; } diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index e1f6801..b91b663 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -1,15 +1,19 @@ #include "cassandra_support.h" -#include "mosquittopp.h" -#include "trygvis/sensor/io.h" +#include "mqtt_support.h" +#include "misc_support.h" #include <thread> -#include <boost/lexical_cast.hpp> #include <boost/program_options.hpp> +#include <trygvis/sensor/io.h> + +namespace sm_mqtt_consumer { using namespace std; using namespace std::chrono; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace trygvis::cassandra_support; +using namespace trygvis::mqtt_support; +using namespace trygvis::misc_support; using namespace boost; namespace po = boost::program_options; @@ -53,29 +57,12 @@ struct device_measurement { } }; -string &&to_string(CassFuture *f) { - const char *message; - size_t message_length; - cass_future_error_message(f, &message, &message_length); - - return std::move(string(message, message_length)); -} - -class mqtt_lib { -public: - mqtt_lib() { - mosquitto_lib_init(); - } - - ~mqtt_lib() { - mosquitto_lib_cleanup(); - } -}; - class mqtt_client : private mosqpp::mosquittopp { public: - mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(), - on_message_(on_message_) { + typedef std::function<void(const struct mosquitto_message *)> callback_t; + + mqtt_client(callback_t on_message_) : mosquittopp(), + on_message_(on_message_) { cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; loop_start(); connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); @@ -87,7 +74,7 @@ public: } private: - std::function<void(const struct mosquitto_message *)> on_message_; + callback_t on_message_; bool subscribed = false; void on_connect(int rc) override { @@ -132,10 +119,6 @@ private: } }; -void print_error(CassFuture *future) { - cout << "Cassandra error: " << error_message(future) << endl; -} - auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); @@ -161,33 +144,6 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen return cass_session_execute(session, q); } -template<typename Target, typename Source> -boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) { - if (!a.is_initialized()) { - return boost::none; - } - - return make_optional(f(a)); -} - -template<typename Target, typename Source> -boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) { - if (!a.is_initialized()) { - return boost::none; - } - - return f(a.get()); -} - -template<typename Target, typename Source = string> -boost::optional<Target> l_c(const Source source) { - try { - return boost::lexical_cast<Target>(source); - } catch (bad_lexical_cast &e) { - return boost::none; - } -}; - void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); cout << "storing message: " << endl; @@ -255,7 +211,7 @@ void on_message(const struct mosquitto_message *message) { handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) { cout << "Success!" << endl; }, [&](auto future, auto err) { - cout << "Failure: " << error_message(future) << endl; + cout << "Failure: " << cassandra_future::error_message(future) << endl; }); } else { cout << "Not connected to Cassandra" << endl; @@ -306,7 +262,7 @@ int main(int argc, const char **argv) { connect_future = cass_session_connect((CassSession*) session.get(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { - string s = to_string(connect_future); + string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; return EXIT_FAILURE; } @@ -326,3 +282,9 @@ int main(int argc, const char **argv) { return 0; } + +} + +int main(int argc, const char **argv) { + return sm_mqtt_consumer::main(argc, argv); +} |