#ifndef TRYGVIS_MQTT_SUPPORT_H #define TRYGVIS_MQTT_SUPPORT_H #include #include #include #include #include #include #include #include #include #include #include #include "mosquitto.h" #include "apps.h" namespace trygvis { namespace mqtt_support { using namespace trygvis::apps; using namespace std; using namespace log4cplus; static inline string error_to_string(int rc) { if (rc == MOSQ_ERR_ERRNO) { return string(strerror(errno)); } return string(mosquitto_strerror(rc)); } static vector mqtt_tokenize_topic(string path) { char **topics; int topic_count; int i; mosquitto_sub_topic_tokenise(path.c_str(), &topics, &topic_count); vector res; for (i = 0; i < topic_count; i++) { if (topics[i] != NULL) { res.emplace_back(topics[i]); } } mosquitto_sub_topic_tokens_free(&topics, topic_count); return res; } class mqtt_error : public std::runtime_error { public: const int error; mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { } }; class mqtt_lib { public: mqtt_lib() { if (mqtt_client_instance_count++ == 0) { lock_guard l(mqtt_client_mutex_); int rc = mosquitto_lib_init(); if (rc != MOSQ_ERR_SUCCESS) { throw mqtt_error("Unable to initialize mosquitto: " + error_to_string(rc), rc); } mosquitto_lib_version(&version_major, &version_minor, &version_revision); } } virtual ~mqtt_lib() { if (--mqtt_client_instance_count == 0) { lock_guard l(mqtt_client_mutex_); mosquitto_lib_cleanup(); } } static int version_major; static int version_minor; static int version_revision; private: static atomic_int mqtt_client_instance_count; static mutex mqtt_client_mutex_; }; enum mqtt_client_personality { threaded, polling }; template class mqtt_client : public waitable, private mqtt_lib { template struct personality_tag { }; typedef personality_tag threaded_tag; typedef personality_tag polling_tag; const personality_tag p_tag{}; struct mosquitto *mosquitto; const string host; const int port; const int keep_alive; recursive_mutex this_mutex; using guard = lock_guard; bool connecting_, connected_; // bool should_reconnect_; int unacked_messages_; // condition_variable cv; // mutex cv_mutex; void assert_success(const string &function, int rc) { if (rc != MOSQ_ERR_SUCCESS) { throw mqtt_error(function + ": " + error_to_string(rc), rc); } } public: mqtt_client(const string &host, const int port, const int keep_alive, const o &client_id, const bool clean_session) : host(host), port(port), connecting_(false), connected_(false), /*should_reconnect_(false),*/ keep_alive(keep_alive), unacked_messages_(0) { const char *id = nullptr; if (client_id) { id = client_id->c_str(); } else { if (!clean_session) { throw mqtt_error("If client id is not specified, clean session must be true", MOSQ_ERR_INVAL); } } mosquitto = mosquitto_new(id, clean_session, this); if (!mosquitto) { string err = strerror(errno); throw runtime_error("Could not initialize mosquitto instance: " + err); } mosquitto_connect_callback_set(mosquitto, on_connect_cb); mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); mosquitto_publish_callback_set(mosquitto, on_publish_cb); mosquitto_message_callback_set(mosquitto, on_message_cb); mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); mosquitto_log_callback_set(mosquitto, on_log_cb); post_construct(p_tag); } private: void post_construct(threaded_tag) { LOG4CPLUS_INFO(logger, "mosquitto_loop_start"); int rc = mosquitto_loop_start(mosquitto); assert_success("mosquitto_loop_start", rc); } void post_construct(polling_tag) { } public: virtual ~mqtt_client() { // should_reconnect_ = false; pre_destruct(p_tag); disconnect(); } private: void pre_destruct(threaded_tag) { int rc = mosquitto_loop_stop(mosquitto, true); if (rc) { LOG4CPLUS_WARN(logger, "mosquitto_loop_stop: " << error_to_string(rc)); } } void pre_destruct(polling_tag) { } public: int unacked_messages() { guard lock(this_mutex); return unacked_messages_; } bool connected() { guard lock(this_mutex); return connected_; } bool connecting() { guard lock(this_mutex); return connecting_; } void connect() { guard lock(this_mutex); LOG4CPLUS_INFO(logger, "Connecting to " << host << ":" << port << ", keep_alive=" << keep_alive); if (connecting_ || connected_) { disconnect(); } connect(p_tag); } private: void connect(threaded_tag) { connecting_ = true; connected_ = false; LOG4CPLUS_DEBUG(logger, "mosquitto_connect_async"); int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); assert_success("mosquitto_connect_async", rc); } void connect(polling_tag) { connecting_ = false; connected_ = true; LOG4CPLUS_DEBUG(logger, "mosquitto_connect"); int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); assert_success("mosquitto_connect", rc); } private: void on_connect_wrapper(int rc) { guard lock(this_mutex); connected_ = rc == MOSQ_ERR_SUCCESS; connecting_ = false; if (connected_) { LOG4CPLUS_INFO(logger, "Connected"); } else { LOG4CPLUS_INFO(logger, "Could not connect: " << error_to_string(rc)); } on_connect(rc); cv.notify_all(); } void on_disconnect_wrapper(int rc) { guard lock(this_mutex); LOG4CPLUS_INFO(logger, "Disconnected, rc=" << error_to_string(rc)); bool was_connecting = connecting_, was_connected = connected_; connecting_ = connected_ = false; unacked_messages_ = 0; on_disconnect(was_connecting, was_connected, rc); cv.notify_all(); // if (should_reconnect_) { // LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << error_to_string(rc)); // this->connect(); // if (rc != MOSQ_ERR_SUCCESS) { // LOG4CPLUS_WARN(logger, "Error when reconnecting: " << error_to_string(rc)); // } // } else { // LOG4CPLUS_INFO(logger, "Disconnected"); // } } void on_publish_wrapper(int message_id) { guard lock(this_mutex); LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); unacked_messages_--; on_publish(message_id); cv.notify_all(); } void on_message_wrapper(const struct mosquitto_message *message) { guard lock(this_mutex); on_message(message); } void on_subscribe_wrapper(int mid, int qos_count, const int *granted_qos) { guard lock(this_mutex); on_subscribe(mid, mid, granted_qos); } void on_unsubscribe_wrapper(int mid) { guard lock(this_mutex); on_unsubscribe(mid); } void on_log_wrapper(int level, const char *str) { guard lock(this_mutex); log4cplus::LogLevel l; if (level == MOSQ_LOG_INFO) { l = log4cplus::INFO_LOG_LEVEL; } else if (level == MOSQ_LOG_NOTICE) { l = log4cplus::INFO_LOG_LEVEL; } else if (level == MOSQ_LOG_WARNING) { l = log4cplus::WARN_LOG_LEVEL; } else if (level == MOSQ_LOG_ERR) { l = log4cplus::FATAL_LOG_LEVEL; } else { l = log4cplus::DEBUG_LOG_LEVEL; } if (logger.isEnabledFor(l)) { log4cplus::tostringstream _log4cplus_buf; _log4cplus_buf << "mosquitto: " << str; logger.forcedLog(l, _log4cplus_buf.str()); } on_log(level, str); } public: void disconnect() { LOG4CPLUS_INFO(logger, "Disconnecting, connected: " << (connected() ? "yes" : "no")); int rc = mosquitto_disconnect(mosquitto); LOG4CPLUS_DEBUG(logger, "mosquitto_disconnect: " << error_to_string(rc)); } void subscribe(int *mid, const string &topic, int qos) { int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); assert_success("mosquitto_subscribe", rc); } void publish(int *mid, const string &topic, int qos, bool retain, const string &s) { auto len = s.length(); auto int_max = std::numeric_limits::max(); if (len > int_max) { len = static_cast(int_max); } publish(mid, topic, qos, retain, static_cast(len), s.c_str()); } void publish(int *mid, const string &topic, int qos, bool retain, int payload_len, const void *payload) { // if (!connected_) { // throw mqtt_error("not connected", MOSQ_ERR_NO_CONN); // } LOG4CPLUS_DEBUG(logger, "Publishing " << payload_len << " bytes to " << topic); int rc = mosquitto_publish(mosquitto, mid, topic.c_str(), payload_len, payload, qos, retain); if (rc == MOSQ_ERR_SUCCESS) { guard lock(this_mutex); unacked_messages_++; } assert_success("mosquitto_publish", rc); } // void set_should_reconnect(bool should_reconnect) { // this->should_reconnect_ = should_reconnect; // } public: void poll() { poll(p_tag); } private: void poll(threaded_tag) { } void poll(polling_tag) { int rc = mosquitto_loop(mosquitto, 100, 1); assert_success("mosquitto_loop", rc); } // ------------------------------------------- // Callbacks // ------------------------------------------- protected: virtual void on_connect(int rc) { } virtual void on_disconnect(bool was_connecting, bool was_connected, int rc) { } virtual void on_publish(int mid) { } virtual void on_message(const struct mosquitto_message *message) { } virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { } virtual void on_unsubscribe(int mid) { } virtual void on_log(int level, const char *str) { } private: static void on_connect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_connect_wrapper(rc); } static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_disconnect_wrapper(rc); } static void on_publish_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_publish_wrapper(rc); } static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { static_cast(self)->on_message_wrapper(message); } static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { static_cast(self)->on_subscribe_wrapper(mid, qos_count, granted_qos); } static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { static_cast(self)->on_unsubscribe_wrapper(mid); } static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { static_cast(self)->on_log_wrapper(level, str); } Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt_client")); }; } } #endif