From 8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 14 Feb 2016 14:41:52 +0100 Subject: mqtt: Using mqtt_support utilities from the mqtt-cassandra bridge. --- apps/mqtt_support.h | 419 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 419 insertions(+) create mode 100644 apps/mqtt_support.h (limited to 'apps/mqtt_support.h') diff --git a/apps/mqtt_support.h b/apps/mqtt_support.h new file mode 100644 index 0000000..48a6c39 --- /dev/null +++ b/apps/mqtt_support.h @@ -0,0 +1,419 @@ +#ifndef TRYGVIS_MQTT_SUPPORT_H +#define TRYGVIS_MQTT_SUPPORT_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mosquitto.h" + +namespace trygvis { +namespace mqtt_support { + +template +using o = std::experimental::optional; + +using namespace std; +using namespace log4cplus; +using namespace gsl; + +static inline +string error_to_string(int rc) { + if (rc == MOSQ_ERR_ERRNO) { + return string(strerror(errno)); + } + return string(mosquitto_strerror(rc)); +} + +class mqtt_error : public std::runtime_error { + +public: + const int error; + + mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { + } +}; + +class mqtt_lib { +public: + mqtt_lib() { + if (mqtt_client_instance_count++ == 0) { + lock_guard l(mqtt_client_mutex_); + int rc = mosquitto_lib_init(); + + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error("Unable to initialize mosquitto: " + error_to_string(rc), rc); + } + + mosquitto_lib_version(&version_major, &version_minor, &version_revision); + } + } + + virtual ~mqtt_lib() { + if (--mqtt_client_instance_count == 0) { + lock_guard l(mqtt_client_mutex_); + + mosquitto_lib_cleanup(); + } + } + + static int version_major; + static int version_minor; + static int version_revision; + +private: + static atomic_int mqtt_client_instance_count; + static mutex mqtt_client_mutex_; +}; + +enum mqtt_client_personality { + threaded, + polling +}; + +template +class mqtt_client : private mqtt_lib { + template + struct personality_tag { + }; + + typedef personality_tag threaded_tag; + typedef personality_tag polling_tag; + const personality_tag p_tag{}; + + struct mosquitto *mosquitto; + + const string host; + const int port; + const int keep_alive; + + recursive_mutex this_mutex; + using guard = lock_guard; + + bool connecting_, connected_; +// bool should_reconnect_; + + int unacked_messages_; + condition_variable cv; + mutex cv_mutex; + + void assert_success(const string &function, int rc) { + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error(function + ": " + error_to_string(rc), rc); + } + } + +public: + mqtt_client(const string &host, const int port, const int keep_alive, const o &client_id, + const bool clean_session) : + host(host), port(port), connecting_(false), connected_(false), /*should_reconnect_(false),*/ + keep_alive(keep_alive), unacked_messages_(0) { + mosquitto = mosquitto_new(client_id ? (*client_id).c_str() : nullptr, clean_session, this); + if (!mosquitto) { + string err = strerror(errno); + throw runtime_error("Could not initialize mosquitto instance: " + err); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + post_construct(p_tag); + } + +private: + void post_construct(threaded_tag) { + LOG4CPLUS_INFO(logger, "mosquitto_loop_start"); + int rc = mosquitto_loop_start(mosquitto); + assert_success("mosquitto_loop_start", rc); + } + + void post_construct(polling_tag) { + } + +public: + + virtual ~mqtt_client() { +// should_reconnect_ = false; + pre_destruct(p_tag); + + disconnect(); + } + +private: + void pre_destruct(threaded_tag) { + int rc = mosquitto_loop_stop(mosquitto, true); + if (rc) { + LOG4CPLUS_WARN(logger, "mosquitto_loop_stop: " << error_to_string(rc)); + } + } + + void pre_destruct(polling_tag) { + } + +public: + void wait() { + unique_lock lk(cv_mutex); + cv.wait(lk); + } + + int unacked_messages() { + guard lock(this_mutex); + return unacked_messages_; + } + + bool connected() { + guard lock(this_mutex); + + return connected_; + } + + bool connecting() { + guard lock(this_mutex); + + return connecting_; + } + + void connect() { + guard lock(this_mutex); + + LOG4CPLUS_INFO(logger, "Connecting to " << host << ":" << port << ", keep_alive=" << keep_alive); + + if (connecting_ || connected_) { + disconnect(); + } + + connect(p_tag); + } + +private: + void connect(threaded_tag) { + connecting_ = true; + connected_ = false; + + LOG4CPLUS_DEBUG(logger, "mosquitto_connect_async"); + int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); + assert_success("mosquitto_connect_async", rc); + } + + void connect(polling_tag) { + connecting_ = false; + connected_ = true; + + LOG4CPLUS_DEBUG(logger, "mosquitto_connect"); + int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); + assert_success("mosquitto_connect", rc); + } + +private: + void on_connect_wrapper(int rc) { + guard lock(this_mutex); + + connected_ = rc == MOSQ_ERR_SUCCESS; + connecting_ = false; + + if (connected_) { + LOG4CPLUS_INFO(logger, "Connected"); + } else { + LOG4CPLUS_INFO(logger, "Could not connect: " << error_to_string(rc)); + } + on_connect(rc); + + cv.notify_all(); + } + + void on_disconnect_wrapper(int rc) { + guard lock(this_mutex); + + LOG4CPLUS_INFO(logger, "Disconnected"); + + bool was_connecting = connecting_, was_connected = connected_; + connecting_ = connected_ = false; + unacked_messages_ = 0; + + on_disconnect(was_connecting, was_connected, rc); + + cv.notify_all(); + +// if (should_reconnect_) { +// LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << error_to_string(rc)); +// this->connect(); +// if (rc != MOSQ_ERR_SUCCESS) { +// LOG4CPLUS_WARN(logger, "Error when reconnecting: " << error_to_string(rc)); +// } +// } else { +// LOG4CPLUS_INFO(logger, "Disconnected"); +// } + } + + void on_publish_wrapper(int message_id) { + guard lock(this_mutex); + + LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); + unacked_messages_--; + + on_publish(message_id); + + cv.notify_all(); + } + + void on_message_wrapper(const struct mosquitto_message *message) { + guard lock(this_mutex); + on_message(message); + } + + void on_subscribe_wrapper(int mid, int qos_count, const int *granted_qos) { + guard lock(this_mutex); + on_subscribe(mid, mid, granted_qos); + } + + void on_unsubscribe_wrapper(int mid) { + guard lock(this_mutex); + on_unsubscribe(mid); + } + + void on_log_wrapper(int level, const char *str) { + guard lock(this_mutex); + + log4cplus::LogLevel l; + + if (level == MOSQ_LOG_INFO) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_NOTICE) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_WARNING) { + l = log4cplus::WARN_LOG_LEVEL; + } else if (level == MOSQ_LOG_ERR) { + l = log4cplus::FATAL_LOG_LEVEL; + } else { + l = log4cplus::DEBUG_LOG_LEVEL; + } + + if (logger.isEnabledFor(l)) { + log4cplus::tostringstream _log4cplus_buf; + _log4cplus_buf << "mosquitto: " << str; + logger.forcedLog(l, _log4cplus_buf.str()); + } + + on_log(level, str); + } + +public: + void disconnect() { + LOG4CPLUS_INFO(logger, "Disconnecting, connected: " << (connected() ? "yes" : "no")); + int rc = mosquitto_disconnect(mosquitto); + LOG4CPLUS_DEBUG(logger, "mosquitto_disconnect: " << error_to_string(rc)); + } + + void subscribe(int *mid, const string &topic, int qos) { + int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); + assert_success("mosquitto_subscribe", rc); + } + + void publish(int *mid, const string &topic, int qos, bool retain, int payload_len, const void *payload) { +// if (!connected_) { +// throw mqtt_error("not connected", MOSQ_ERR_NO_CONN); +// } + + LOG4CPLUS_DEBUG(logger, "Publishing " << payload_len << " bytes to " << topic); + + int rc = mosquitto_publish(mosquitto, mid, topic.c_str(), payload_len, payload, qos, retain); + + if(rc == MOSQ_ERR_SUCCESS) { + guard lock(this_mutex); + unacked_messages_++; + } + + assert_success("mosquitto_publish", rc); + } + +// void set_should_reconnect(bool should_reconnect) { +// this->should_reconnect_ = should_reconnect; +// } + +public: + void poll() { + poll(p_tag); + } + +private: + void poll(threaded_tag) { + } + + void poll(polling_tag) { + int rc = mosquitto_loop(mosquitto, 100, 1); + assert_success("mosquitto_loop", rc); + } + + // ------------------------------------------- + // Callbacks + // ------------------------------------------- + +protected: + virtual void on_connect(int rc) { + } + + virtual void on_disconnect(bool was_connecting, bool was_connected, int rc) { + } + + virtual void on_publish(int mid) { + } + + virtual void on_message(const struct mosquitto_message *message) { + } + + virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { + } + + virtual void on_unsubscribe(int mid) { + } + + virtual void on_log(int level, const char *str) { + } + +private: + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_connect_wrapper(rc); + } + + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_disconnect_wrapper(rc); + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_publish_wrapper(rc); + } + + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast(self)->on_message_wrapper(message); + } + + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast(self)->on_subscribe_wrapper(mid, qos_count, granted_qos); + } + + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast(self)->on_unsubscribe_wrapper(mid); + } + + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast(self)->on_log_wrapper(level, str); + } + + Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt_client")); +}; + +} +} + +#endif -- cgit v1.2.3