diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-08 23:03:12 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-08 23:03:12 +0200 |
commit | e9847a2bc0781cb578dc9a0b9c469ecaf48fe584 (patch) | |
tree | b81d045c7683eba7542a54be35bbae94356fdb12 | |
parent | d2caf82ad16f8d31db6afdd69383ba1c04e02c32 (diff) | |
download | mqtt-cassandra-bridge-e9847a2bc0781cb578dc9a0b9c469ecaf48fe584.tar.gz mqtt-cassandra-bridge-e9847a2bc0781cb578dc9a0b9c469ecaf48fe584.tar.bz2 mqtt-cassandra-bridge-e9847a2bc0781cb578dc9a0b9c469ecaf48fe584.tar.xz mqtt-cassandra-bridge-e9847a2bc0781cb578dc9a0b9c469ecaf48fe584.zip |
-rw-r--r-- | CMakeLists.txt | 10 | ||||
-rw-r--r-- | mqtt_support.cpp | 12 | ||||
-rw-r--r-- | mqtt_support.h | 206 | ||||
-rw-r--r-- | raw-mqtt-consumer.cpp | 66 | ||||
-rw-r--r-- | sm-mqtt-consumer.cpp | 1 |
5 files changed, 238 insertions, 57 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 95bcf89..05706d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,10 @@ ExternalProject_Add(nghttp2 # TODO: proper discovery # sm-mqtt-consumer -add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES}) +add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp + cassandra_support.h + mqtt_support.cpp mqtt_support.h + ${SHARED_SOURCES}) add_dependencies(sm-mqtt-consumer cpp-driver) ## Boost target_link_libraries(sm-mqtt-consumer PRIVATE ${Boost_LIBRARIES}) @@ -63,7 +66,10 @@ target_include_directories(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/include) target_link_libraries(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) # raw-mqtt-consumer -add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES}) +add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp + cassandra_support.h + mqtt_support.cpp mqtt_support.h + ${SHARED_SOURCES}) add_dependencies(raw-mqtt-consumer cpp-driver) ## Boost target_link_libraries(raw-mqtt-consumer PRIVATE ${Boost_LIBRARIES}) diff --git a/mqtt_support.cpp b/mqtt_support.cpp new file mode 100644 index 0000000..5f8404d --- /dev/null +++ b/mqtt_support.cpp @@ -0,0 +1,12 @@ +#include "mqtt_support.h" + +namespace trygvis { +namespace mqtt_support { + +using namespace std; + +int mqtt_client_instance_count = 0; +mutex mqtt_client_mutex_; + +} +} diff --git a/mqtt_support.h b/mqtt_support.h index 2ed5134..6b000e9 100644 --- a/mqtt_support.h +++ b/mqtt_support.h @@ -1,13 +1,17 @@ #ifndef TRYGVIS_MQTT_SUPPORT_H #define TRYGVIS_MQTT_SUPPORT_H -#include "mosquittopp.h" +#include <mutex> +#include <string> +#include <exception> +#include <iostream> +#include <cstring> #include "mosquitto.h" namespace trygvis { namespace mqtt_support { -using namespace mosqpp; +using namespace std; class mqtt_lib { public: @@ -20,6 +24,204 @@ public: } }; +class mqtt_error : public std::runtime_error { + +public: + const int error; + + mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { + } +}; + +enum mqtt_client_personality { + threaded, + polling +}; + +extern mutex mqtt_client_mutex_; +extern int mqtt_client_instance_count; + +template<mqtt_client_personality personality> +class mqtt_client { + template<bool> + struct personality_tag { + }; + + typedef personality_tag<mqtt_client_personality::threaded> threaded_tag; + typedef personality_tag<mqtt_client_personality::polling> polling_tag; + const personality_tag<personality> p_tag{}; + + struct mosquitto *mosquitto; + + const string host; + const int port; + const int keep_alive; + + void check(const string &function, int rc) { + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error(function + ": " + error_to_string(rc), rc); + } + } + +public: + mqtt_client(const string &host, const int port, const int keep_alive, const string &id, const bool clean_session) : + host(host), port(port), keep_alive(keep_alive) { + lock_guard <mutex> lock(mqtt_client_mutex_); + + if (mqtt_client_instance_count++ == 0) { + mosquitto_lib_init(); + } + + mosquitto = mosquitto_new(id.size() ? id.c_str() : nullptr, clean_session, this); + if (!mosquitto) { + string err = strerror(errno); + throw runtime_error("Could not initialize mosquitto instance: " + err); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + post_construct(p_tag); + } + +private: + void post_construct(threaded_tag) { + cout << "mosquitto_loop_start" << endl; + int rc = mosquitto_loop_start(mosquitto); + check("mosquitto_loop_start", rc); + } + + void post_construct(polling_tag) { + } + +public: + + virtual ~mqtt_client() { + lock_guard <mutex> lock(mqtt_client_mutex_); + + int rc = mosquitto_loop_stop(mosquitto, true); + if (!rc) { + cerr << "mosquitto_loop_stop: " << error_to_string(rc) << endl; + } + disconnect(); + + if (--mqtt_client_instance_count == 0) { + mosquitto_lib_init(); + } + } + + void connect() { + connect(p_tag); + } + +private: + void connect(threaded_tag) { + cout << "mosquitto_connect_async" << endl; + int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); + check("mosquitto_connect_async", rc); + } + + void connect(polling_tag) { + cout << "mosquitto_connect" << endl; + int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); + check("mosquitto_connect", rc); + } + +public: + void disconnect() { + int rc = mosquitto_disconnect(mosquitto); + if (!rc) { + cerr << "mosquitto_disconnect: " << error_to_string(rc) << endl; + } + } + + void subscribe(int *mid, const string &topic, int qos) { + int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); + check("mosquitto_subscribe", rc); + } + + inline + string error_to_string(int rc) { + if (rc == MOSQ_ERR_ERRNO) { + return string(strerror(errno)); + } + return string(mosquitto_strerror(rc)); + } + +private: + void poll(threaded_tag) { + } + + void poll(polling_tag) { + int rc = mosquitto_loop(mosquitto, 100, 1); + check("mosquitto_loop", rc); + } + +public: + void poll() { + poll(p_tag); + } + + // ------------------------------------------- + // Callbacks + // ------------------------------------------- + +protected: + virtual void on_connect(int rc) { + } + + virtual void on_disconnect(int rc) { + } + + virtual void on_publish(int mid) { + } + + virtual void on_message(const struct mosquitto_message *message) { + } + + virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { + } + + virtual void on_unsubscribe(int mid) { + } + + virtual void on_log(int level, const char *str) { + } + +private: + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_connect(rc); + } + + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_disconnect(rc); + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast<mqtt_client *>(self)->on_publish(rc); + } + + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast<mqtt_client *>(self)->on_message(message); + } + + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast<mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos); + } + + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast<mqtt_client *>(self)->on_unsubscribe(mid); + } + + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast<mqtt_client *>(self)->on_log(level, str); + } +}; + } } diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp index 3cc43be..56b620e 100644 --- a/raw-mqtt-consumer.cpp +++ b/raw-mqtt-consumer.cpp @@ -19,46 +19,28 @@ static bool should_run; static string mqtt_host; static int mqtt_port; static string mqtt_topic; +static string mqtt_client_id; +static bool mqtt_clean_session; static string keyspace_name = "soil_moisture"; static unique_ptr<cassandra_session> current_cassandra_session; -class raw_mqtt_client /*: private mosqpp::mosquittopp*/ { +class raw_mqtt_client : public mqtt_client<mqtt_client_personality::polling> { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; - raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) { - cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; - - mosquitto = mosquitto_new(nullptr, true, this); - if(!mosquitto) { - throw runtime_error("Could not initialize mosquitto instance"); - } - mosquitto_connect_callback_set(mosquitto, on_connect_cb); - mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); - mosquitto_publish_callback_set(mosquitto, on_publish_cb); - mosquitto_message_callback_set(mosquitto, on_message_cb); - mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); - mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); - mosquitto_log_callback_set(mosquitto, on_log_cb); - - mosquitto_loop_start(mosquitto); - mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10); + raw_mqtt_client(callback_t on_message_) : mqtt_client(mqtt_host, mqtt_port, 10, mqtt_client_id, mqtt_clean_session), + on_message_(on_message_) { + cout << "Connecting to MQTT broker at " << mqtt_host << ":" << mqtt_port << endl; + connect(); } ~raw_mqtt_client() { - mosquitto_loop_stop(mosquitto, true); - mosquitto_disconnect(mosquitto); } private: callback_t on_message_; bool subscribed = false; - struct mosquitto *mosquitto; - - static void on_connect_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_connect(rc); - } void on_connect(int rc) { cout << "Connected to MQTT broker, rc=" << rc << endl; @@ -67,55 +49,31 @@ private: if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; - mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos); + subscribe(nullptr, mqtt_topic.c_str(), qos); } } - static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_disconnect(rc); - } - void on_disconnect(int rc) { subscribed = false; - cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl; - } - - static void on_publish_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_publish(rc); + cout << "Oops, disconnected: " << error_to_string(rc) << endl; } void on_publish(int mid) { } - static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { - static_cast<raw_mqtt_client *>(self)->on_message(message); - } - void on_message(const struct mosquitto_message *message) { on_message_(message); } - static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { - static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos); - } - void on_subscribe(int mid, int qos_count, const int *granted_qos) { cout << "Subscribed" << endl; } - static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { - static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid); - } - void on_unsubscribe(int mid) { cout << "Oops, unsubscribed" << endl; } - static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { - static_cast<raw_mqtt_client *>(self)->on_log(level, str); - } - void on_log(int level, const char *str) { cout << "MQTT: " << level << ":" << str << endl; } @@ -209,6 +167,8 @@ int main(int argc, const char **argv) { all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required()); + all.add_options()("mqtt-client-id", po::value<>(&mqtt_client_id)); + all.add_options()("mqtt-clean-session", po::value<>(&mqtt_clean_session)); po::variables_map vm; try { @@ -256,14 +216,14 @@ int main(int argc, const char **argv) { cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); - cout << "Connecting to MQTT broker" << endl; raw_mqtt_client mqtt_client(on_message); should_run = true; while (should_run) { cout << "sleeping.." << endl; - std::this_thread::sleep_for(60s); + mqtt_client.poll(); + std::this_thread::sleep_for(10s); } current_cassandra_session.release(); diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index 778eddf..bd1b5ba 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -3,6 +3,7 @@ #include "misc_support.h" #include <thread> #include <trygvis/sensor/io.h> +#include <mosquittopp.h> namespace sm_mqtt_consumer { |