aboutsummaryrefslogtreecommitdiff
path: root/apps/mqtt_support.h
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mqtt_support.h')
-rw-r--r--apps/mqtt_support.h419
1 files changed, 419 insertions, 0 deletions
diff --git a/apps/mqtt_support.h b/apps/mqtt_support.h
new file mode 100644
index 0000000..48a6c39
--- /dev/null
+++ b/apps/mqtt_support.h
@@ -0,0 +1,419 @@
+#ifndef TRYGVIS_MQTT_SUPPORT_H
+#define TRYGVIS_MQTT_SUPPORT_H
+
+#include <mutex>
+#include <string>
+#include <exception>
+#include <cstring>
+#include <span.h>
+#include <log4cplus/logger.h>
+#include <log4cplus/loggingmacros.h>
+#include <atomic>
+#include <condition_variable>
+#include <limits.h>
+#include <experimental/optional>
+#include "mosquitto.h"
+
+namespace trygvis {
+namespace mqtt_support {
+
+template<typename T>
+using o = std::experimental::optional<T>;
+
+using namespace std;
+using namespace log4cplus;
+using namespace gsl;
+
+static inline
+string error_to_string(int rc) {
+ if (rc == MOSQ_ERR_ERRNO) {
+ return string(strerror(errno));
+ }
+ return string(mosquitto_strerror(rc));
+}
+
+class mqtt_error : public std::runtime_error {
+
+public:
+ const int error;
+
+ mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) {
+ }
+};
+
+class mqtt_lib {
+public:
+ mqtt_lib() {
+ if (mqtt_client_instance_count++ == 0) {
+ lock_guard<mutex> l(mqtt_client_mutex_);
+ int rc = mosquitto_lib_init();
+
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error("Unable to initialize mosquitto: " + error_to_string(rc), rc);
+ }
+
+ mosquitto_lib_version(&version_major, &version_minor, &version_revision);
+ }
+ }
+
+ virtual ~mqtt_lib() {
+ if (--mqtt_client_instance_count == 0) {
+ lock_guard<mutex> l(mqtt_client_mutex_);
+
+ mosquitto_lib_cleanup();
+ }
+ }
+
+ static int version_major;
+ static int version_minor;
+ static int version_revision;
+
+private:
+ static atomic_int mqtt_client_instance_count;
+ static mutex mqtt_client_mutex_;
+};
+
+enum mqtt_client_personality {
+ threaded,
+ polling
+};
+
+template<mqtt_client_personality personality>
+class mqtt_client : private mqtt_lib {
+ template<bool>
+ struct personality_tag {
+ };
+
+ typedef personality_tag<mqtt_client_personality::threaded> threaded_tag;
+ typedef personality_tag<mqtt_client_personality::polling> polling_tag;
+ const personality_tag<personality> p_tag{};
+
+ struct mosquitto *mosquitto;
+
+ const string host;
+ const int port;
+ const int keep_alive;
+
+ recursive_mutex this_mutex;
+ using guard = lock_guard<recursive_mutex>;
+
+ bool connecting_, connected_;
+// bool should_reconnect_;
+
+ int unacked_messages_;
+ condition_variable cv;
+ mutex cv_mutex;
+
+ void assert_success(const string &function, int rc) {
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error(function + ": " + error_to_string(rc), rc);
+ }
+ }
+
+public:
+ mqtt_client(const string &host, const int port, const int keep_alive, const o<string> &client_id,
+ const bool clean_session) :
+ host(host), port(port), connecting_(false), connected_(false), /*should_reconnect_(false),*/
+ keep_alive(keep_alive), unacked_messages_(0) {
+ mosquitto = mosquitto_new(client_id ? (*client_id).c_str() : nullptr, clean_session, this);
+ if (!mosquitto) {
+ string err = strerror(errno);
+ throw runtime_error("Could not initialize mosquitto instance: " + err);
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ post_construct(p_tag);
+ }
+
+private:
+ void post_construct(threaded_tag) {
+ LOG4CPLUS_INFO(logger, "mosquitto_loop_start");
+ int rc = mosquitto_loop_start(mosquitto);
+ assert_success("mosquitto_loop_start", rc);
+ }
+
+ void post_construct(polling_tag) {
+ }
+
+public:
+
+ virtual ~mqtt_client() {
+// should_reconnect_ = false;
+ pre_destruct(p_tag);
+
+ disconnect();
+ }
+
+private:
+ void pre_destruct(threaded_tag) {
+ int rc = mosquitto_loop_stop(mosquitto, true);
+ if (rc) {
+ LOG4CPLUS_WARN(logger, "mosquitto_loop_stop: " << error_to_string(rc));
+ }
+ }
+
+ void pre_destruct(polling_tag) {
+ }
+
+public:
+ void wait() {
+ unique_lock<mutex> lk(cv_mutex);
+ cv.wait(lk);
+ }
+
+ int unacked_messages() {
+ guard lock(this_mutex);
+ return unacked_messages_;
+ }
+
+ bool connected() {
+ guard lock(this_mutex);
+
+ return connected_;
+ }
+
+ bool connecting() {
+ guard lock(this_mutex);
+
+ return connecting_;
+ }
+
+ void connect() {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_INFO(logger, "Connecting to " << host << ":" << port << ", keep_alive=" << keep_alive);
+
+ if (connecting_ || connected_) {
+ disconnect();
+ }
+
+ connect(p_tag);
+ }
+
+private:
+ void connect(threaded_tag) {
+ connecting_ = true;
+ connected_ = false;
+
+ LOG4CPLUS_DEBUG(logger, "mosquitto_connect_async");
+ int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive);
+ assert_success("mosquitto_connect_async", rc);
+ }
+
+ void connect(polling_tag) {
+ connecting_ = false;
+ connected_ = true;
+
+ LOG4CPLUS_DEBUG(logger, "mosquitto_connect");
+ int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive);
+ assert_success("mosquitto_connect", rc);
+ }
+
+private:
+ void on_connect_wrapper(int rc) {
+ guard lock(this_mutex);
+
+ connected_ = rc == MOSQ_ERR_SUCCESS;
+ connecting_ = false;
+
+ if (connected_) {
+ LOG4CPLUS_INFO(logger, "Connected");
+ } else {
+ LOG4CPLUS_INFO(logger, "Could not connect: " << error_to_string(rc));
+ }
+ on_connect(rc);
+
+ cv.notify_all();
+ }
+
+ void on_disconnect_wrapper(int rc) {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_INFO(logger, "Disconnected");
+
+ bool was_connecting = connecting_, was_connected = connected_;
+ connecting_ = connected_ = false;
+ unacked_messages_ = 0;
+
+ on_disconnect(was_connecting, was_connected, rc);
+
+ cv.notify_all();
+
+// if (should_reconnect_) {
+// LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << error_to_string(rc));
+// this->connect();
+// if (rc != MOSQ_ERR_SUCCESS) {
+// LOG4CPLUS_WARN(logger, "Error when reconnecting: " << error_to_string(rc));
+// }
+// } else {
+// LOG4CPLUS_INFO(logger, "Disconnected");
+// }
+ }
+
+ void on_publish_wrapper(int message_id) {
+ guard lock(this_mutex);
+
+ LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
+ unacked_messages_--;
+
+ on_publish(message_id);
+
+ cv.notify_all();
+ }
+
+ void on_message_wrapper(const struct mosquitto_message *message) {
+ guard lock(this_mutex);
+ on_message(message);
+ }
+
+ void on_subscribe_wrapper(int mid, int qos_count, const int *granted_qos) {
+ guard lock(this_mutex);
+ on_subscribe(mid, mid, granted_qos);
+ }
+
+ void on_unsubscribe_wrapper(int mid) {
+ guard lock(this_mutex);
+ on_unsubscribe(mid);
+ }
+
+ void on_log_wrapper(int level, const char *str) {
+ guard lock(this_mutex);
+
+ log4cplus::LogLevel l;
+
+ if (level == MOSQ_LOG_INFO) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_NOTICE) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_WARNING) {
+ l = log4cplus::WARN_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_ERR) {
+ l = log4cplus::FATAL_LOG_LEVEL;
+ } else {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ }
+
+ if (logger.isEnabledFor(l)) {
+ log4cplus::tostringstream _log4cplus_buf;
+ _log4cplus_buf << "mosquitto: " << str;
+ logger.forcedLog(l, _log4cplus_buf.str());
+ }
+
+ on_log(level, str);
+ }
+
+public:
+ void disconnect() {
+ LOG4CPLUS_INFO(logger, "Disconnecting, connected: " << (connected() ? "yes" : "no"));
+ int rc = mosquitto_disconnect(mosquitto);
+ LOG4CPLUS_DEBUG(logger, "mosquitto_disconnect: " << error_to_string(rc));
+ }
+
+ void subscribe(int *mid, const string &topic, int qos) {
+ int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos);
+ assert_success("mosquitto_subscribe", rc);
+ }
+
+ void publish(int *mid, const string &topic, int qos, bool retain, int payload_len, const void *payload) {
+// if (!connected_) {
+// throw mqtt_error("not connected", MOSQ_ERR_NO_CONN);
+// }
+
+ LOG4CPLUS_DEBUG(logger, "Publishing " << payload_len << " bytes to " << topic);
+
+ int rc = mosquitto_publish(mosquitto, mid, topic.c_str(), payload_len, payload, qos, retain);
+
+ if(rc == MOSQ_ERR_SUCCESS) {
+ guard lock(this_mutex);
+ unacked_messages_++;
+ }
+
+ assert_success("mosquitto_publish", rc);
+ }
+
+// void set_should_reconnect(bool should_reconnect) {
+// this->should_reconnect_ = should_reconnect;
+// }
+
+public:
+ void poll() {
+ poll(p_tag);
+ }
+
+private:
+ void poll(threaded_tag) {
+ }
+
+ void poll(polling_tag) {
+ int rc = mosquitto_loop(mosquitto, 100, 1);
+ assert_success("mosquitto_loop", rc);
+ }
+
+ // -------------------------------------------
+ // Callbacks
+ // -------------------------------------------
+
+protected:
+ virtual void on_connect(int rc) {
+ }
+
+ virtual void on_disconnect(bool was_connecting, bool was_connected, int rc) {
+ }
+
+ virtual void on_publish(int mid) {
+ }
+
+ virtual void on_message(const struct mosquitto_message *message) {
+ }
+
+ virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) {
+ }
+
+ virtual void on_unsubscribe(int mid) {
+ }
+
+ virtual void on_log(int level, const char *str) {
+ }
+
+private:
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_connect_wrapper(rc);
+ }
+
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_disconnect_wrapper(rc);
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_publish_wrapper(rc);
+ }
+
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<mqtt_client *>(self)->on_message_wrapper(message);
+ }
+
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<mqtt_client *>(self)->on_subscribe_wrapper(mid, qos_count, granted_qos);
+ }
+
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<mqtt_client *>(self)->on_unsubscribe_wrapper(mid);
+ }
+
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<mqtt_client *>(self)->on_log_wrapper(level, str);
+ }
+
+ Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt_client"));
+};
+
+}
+}
+
+#endif