aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt10
-rw-r--r--mqtt_support.cpp12
-rw-r--r--mqtt_support.h206
-rw-r--r--raw-mqtt-consumer.cpp66
-rw-r--r--sm-mqtt-consumer.cpp1
5 files changed, 238 insertions, 57 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 95bcf89..05706d8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -47,7 +47,10 @@ ExternalProject_Add(nghttp2
# TODO: proper discovery
# sm-mqtt-consumer
-add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES})
+add_executable(sm-mqtt-consumer sm-mqtt-consumer.cpp
+ cassandra_support.h
+ mqtt_support.cpp mqtt_support.h
+ ${SHARED_SOURCES})
add_dependencies(sm-mqtt-consumer cpp-driver)
## Boost
target_link_libraries(sm-mqtt-consumer PRIVATE ${Boost_LIBRARIES})
@@ -63,7 +66,10 @@ target_include_directories(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/include)
target_link_libraries(sm-mqtt-consumer PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a)
# raw-mqtt-consumer
-add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp cassandra_support.h mqtt_support.h ${SHARED_SOURCES})
+add_executable(raw-mqtt-consumer raw-mqtt-consumer.cpp
+ cassandra_support.h
+ mqtt_support.cpp mqtt_support.h
+ ${SHARED_SOURCES})
add_dependencies(raw-mqtt-consumer cpp-driver)
## Boost
target_link_libraries(raw-mqtt-consumer PRIVATE ${Boost_LIBRARIES})
diff --git a/mqtt_support.cpp b/mqtt_support.cpp
new file mode 100644
index 0000000..5f8404d
--- /dev/null
+++ b/mqtt_support.cpp
@@ -0,0 +1,12 @@
+#include "mqtt_support.h"
+
+namespace trygvis {
+namespace mqtt_support {
+
+using namespace std;
+
+int mqtt_client_instance_count = 0;
+mutex mqtt_client_mutex_;
+
+}
+}
diff --git a/mqtt_support.h b/mqtt_support.h
index 2ed5134..6b000e9 100644
--- a/mqtt_support.h
+++ b/mqtt_support.h
@@ -1,13 +1,17 @@
#ifndef TRYGVIS_MQTT_SUPPORT_H
#define TRYGVIS_MQTT_SUPPORT_H
-#include "mosquittopp.h"
+#include <mutex>
+#include <string>
+#include <exception>
+#include <iostream>
+#include <cstring>
#include "mosquitto.h"
namespace trygvis {
namespace mqtt_support {
-using namespace mosqpp;
+using namespace std;
class mqtt_lib {
public:
@@ -20,6 +24,204 @@ public:
}
};
+class mqtt_error : public std::runtime_error {
+
+public:
+ const int error;
+
+ mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) {
+ }
+};
+
+enum mqtt_client_personality {
+ threaded,
+ polling
+};
+
+extern mutex mqtt_client_mutex_;
+extern int mqtt_client_instance_count;
+
+template<mqtt_client_personality personality>
+class mqtt_client {
+ template<bool>
+ struct personality_tag {
+ };
+
+ typedef personality_tag<mqtt_client_personality::threaded> threaded_tag;
+ typedef personality_tag<mqtt_client_personality::polling> polling_tag;
+ const personality_tag<personality> p_tag{};
+
+ struct mosquitto *mosquitto;
+
+ const string host;
+ const int port;
+ const int keep_alive;
+
+ void check(const string &function, int rc) {
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error(function + ": " + error_to_string(rc), rc);
+ }
+ }
+
+public:
+ mqtt_client(const string &host, const int port, const int keep_alive, const string &id, const bool clean_session) :
+ host(host), port(port), keep_alive(keep_alive) {
+ lock_guard <mutex> lock(mqtt_client_mutex_);
+
+ if (mqtt_client_instance_count++ == 0) {
+ mosquitto_lib_init();
+ }
+
+ mosquitto = mosquitto_new(id.size() ? id.c_str() : nullptr, clean_session, this);
+ if (!mosquitto) {
+ string err = strerror(errno);
+ throw runtime_error("Could not initialize mosquitto instance: " + err);
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ post_construct(p_tag);
+ }
+
+private:
+ void post_construct(threaded_tag) {
+ cout << "mosquitto_loop_start" << endl;
+ int rc = mosquitto_loop_start(mosquitto);
+ check("mosquitto_loop_start", rc);
+ }
+
+ void post_construct(polling_tag) {
+ }
+
+public:
+
+ virtual ~mqtt_client() {
+ lock_guard <mutex> lock(mqtt_client_mutex_);
+
+ int rc = mosquitto_loop_stop(mosquitto, true);
+ if (!rc) {
+ cerr << "mosquitto_loop_stop: " << error_to_string(rc) << endl;
+ }
+ disconnect();
+
+ if (--mqtt_client_instance_count == 0) {
+ mosquitto_lib_init();
+ }
+ }
+
+ void connect() {
+ connect(p_tag);
+ }
+
+private:
+ void connect(threaded_tag) {
+ cout << "mosquitto_connect_async" << endl;
+ int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive);
+ check("mosquitto_connect_async", rc);
+ }
+
+ void connect(polling_tag) {
+ cout << "mosquitto_connect" << endl;
+ int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive);
+ check("mosquitto_connect", rc);
+ }
+
+public:
+ void disconnect() {
+ int rc = mosquitto_disconnect(mosquitto);
+ if (!rc) {
+ cerr << "mosquitto_disconnect: " << error_to_string(rc) << endl;
+ }
+ }
+
+ void subscribe(int *mid, const string &topic, int qos) {
+ int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos);
+ check("mosquitto_subscribe", rc);
+ }
+
+ inline
+ string error_to_string(int rc) {
+ if (rc == MOSQ_ERR_ERRNO) {
+ return string(strerror(errno));
+ }
+ return string(mosquitto_strerror(rc));
+ }
+
+private:
+ void poll(threaded_tag) {
+ }
+
+ void poll(polling_tag) {
+ int rc = mosquitto_loop(mosquitto, 100, 1);
+ check("mosquitto_loop", rc);
+ }
+
+public:
+ void poll() {
+ poll(p_tag);
+ }
+
+ // -------------------------------------------
+ // Callbacks
+ // -------------------------------------------
+
+protected:
+ virtual void on_connect(int rc) {
+ }
+
+ virtual void on_disconnect(int rc) {
+ }
+
+ virtual void on_publish(int mid) {
+ }
+
+ virtual void on_message(const struct mosquitto_message *message) {
+ }
+
+ virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) {
+ }
+
+ virtual void on_unsubscribe(int mid) {
+ }
+
+ virtual void on_log(int level, const char *str) {
+ }
+
+private:
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_connect(rc);
+ }
+
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_disconnect(rc);
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_publish(rc);
+ }
+
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<mqtt_client *>(self)->on_message(message);
+ }
+
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
+ }
+
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<mqtt_client *>(self)->on_unsubscribe(mid);
+ }
+
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<mqtt_client *>(self)->on_log(level, str);
+ }
+};
+
}
}
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
index 3cc43be..56b620e 100644
--- a/raw-mqtt-consumer.cpp
+++ b/raw-mqtt-consumer.cpp
@@ -19,46 +19,28 @@ static bool should_run;
static string mqtt_host;
static int mqtt_port;
static string mqtt_topic;
+static string mqtt_client_id;
+static bool mqtt_clean_session;
static string keyspace_name = "soil_moisture";
static unique_ptr<cassandra_session> current_cassandra_session;
-class raw_mqtt_client /*: private mosqpp::mosquittopp*/ {
+class raw_mqtt_client : public mqtt_client<mqtt_client_personality::polling> {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
- raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) {
- cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
-
- mosquitto = mosquitto_new(nullptr, true, this);
- if(!mosquitto) {
- throw runtime_error("Could not initialize mosquitto instance");
- }
- mosquitto_connect_callback_set(mosquitto, on_connect_cb);
- mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
- mosquitto_publish_callback_set(mosquitto, on_publish_cb);
- mosquitto_message_callback_set(mosquitto, on_message_cb);
- mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
- mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
- mosquitto_log_callback_set(mosquitto, on_log_cb);
-
- mosquitto_loop_start(mosquitto);
- mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10);
+ raw_mqtt_client(callback_t on_message_) : mqtt_client(mqtt_host, mqtt_port, 10, mqtt_client_id, mqtt_clean_session),
+ on_message_(on_message_) {
+ cout << "Connecting to MQTT broker at " << mqtt_host << ":" << mqtt_port << endl;
+ connect();
}
~raw_mqtt_client() {
- mosquitto_loop_stop(mosquitto, true);
- mosquitto_disconnect(mosquitto);
}
private:
callback_t on_message_;
bool subscribed = false;
- struct mosquitto *mosquitto;
-
- static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_connect(rc);
- }
void on_connect(int rc) {
cout << "Connected to MQTT broker, rc=" << rc << endl;
@@ -67,55 +49,31 @@ private:
if (!subscribed) {
subscribed = true;
cout << "Subscribing..." << endl;
- mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos);
+ subscribe(nullptr, mqtt_topic.c_str(), qos);
}
}
- static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_disconnect(rc);
- }
-
void on_disconnect(int rc) {
subscribed = false;
- cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl;
- }
-
- static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_publish(rc);
+ cout << "Oops, disconnected: " << error_to_string(rc) << endl;
}
void on_publish(int mid) {
}
- static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
- static_cast<raw_mqtt_client *>(self)->on_message(message);
- }
-
void on_message(const struct mosquitto_message *message) {
on_message_(message);
}
- static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
- static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
- }
-
void on_subscribe(int mid, int qos_count, const int *granted_qos) {
cout << "Subscribed" << endl;
}
- static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
- static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid);
- }
-
void on_unsubscribe(int mid) {
cout << "Oops, unsubscribed" << endl;
}
- static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
- static_cast<raw_mqtt_client *>(self)->on_log(level, str);
- }
-
void on_log(int level, const char *str) {
cout << "MQTT: " << level << ":" << str << endl;
}
@@ -209,6 +167,8 @@ int main(int argc, const char **argv) {
all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required());
+ all.add_options()("mqtt-client-id", po::value<>(&mqtt_client_id));
+ all.add_options()("mqtt-clean-session", po::value<>(&mqtt_clean_session));
po::variables_map vm;
try {
@@ -256,14 +216,14 @@ int main(int argc, const char **argv) {
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
- cout << "Connecting to MQTT broker" << endl;
raw_mqtt_client mqtt_client(on_message);
should_run = true;
while (should_run) {
cout << "sleeping.." << endl;
- std::this_thread::sleep_for(60s);
+ mqtt_client.poll();
+ std::this_thread::sleep_for(10s);
}
current_cassandra_session.release();
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index 778eddf..bd1b5ba 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -3,6 +3,7 @@
#include "misc_support.h"
#include <thread>
#include <trygvis/sensor/io.h>
+#include <mosquittopp.h>
namespace sm_mqtt_consumer {