#ifndef TRYGVIS_MQTT_SUPPORT_H #define TRYGVIS_MQTT_SUPPORT_H #include #include #include #include #include #include "mosquitto.h" namespace trygvis { namespace mqtt_support { using namespace std; class mqtt_lib { public: mqtt_lib() { mosquitto_lib_init(); } ~mqtt_lib() { mosquitto_lib_cleanup(); } }; class mqtt_error : public std::runtime_error { public: const int error; mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { } }; enum mqtt_client_personality { threaded, polling }; extern mutex mqtt_client_mutex_; extern int mqtt_client_instance_count; template class mqtt_client { template struct personality_tag { }; typedef personality_tag threaded_tag; typedef personality_tag polling_tag; const personality_tag p_tag{}; struct mosquitto *mosquitto; const string host; const int port; const int keep_alive; void check(const string &function, int rc) { if (rc != MOSQ_ERR_SUCCESS) { throw mqtt_error(function + ": " + error_to_string(rc), rc); } } public: mqtt_client(const string &host, const int port, const int keep_alive, const string &id, const bool clean_session) : host(host), port(port), keep_alive(keep_alive) { lock_guard lock(mqtt_client_mutex_); if (mqtt_client_instance_count++ == 0) { mosquitto_lib_init(); } mosquitto = mosquitto_new(id.size() ? id.c_str() : nullptr, clean_session, this); if (!mosquitto) { string err = strerror(errno); throw runtime_error("Could not initialize mosquitto instance: " + err); } mosquitto_connect_callback_set(mosquitto, on_connect_cb); mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); mosquitto_publish_callback_set(mosquitto, on_publish_cb); mosquitto_message_callback_set(mosquitto, on_message_cb); mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); mosquitto_log_callback_set(mosquitto, on_log_cb); post_construct(p_tag); } private: void post_construct(threaded_tag) { cout << "mosquitto_loop_start" << endl; int rc = mosquitto_loop_start(mosquitto); check("mosquitto_loop_start", rc); } void post_construct(polling_tag) { } public: virtual ~mqtt_client() { lock_guard lock(mqtt_client_mutex_); int rc = mosquitto_loop_stop(mosquitto, true); if (!rc) { cerr << "mosquitto_loop_stop: " << error_to_string(rc) << endl; } disconnect(); if (--mqtt_client_instance_count == 0) { mosquitto_lib_init(); } } void connect() { connect(p_tag); } private: void connect(threaded_tag) { cout << "mosquitto_connect_async" << endl; int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); check("mosquitto_connect_async", rc); } void connect(polling_tag) { cout << "mosquitto_connect" << endl; int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); check("mosquitto_connect", rc); } public: void disconnect() { int rc = mosquitto_disconnect(mosquitto); if (!rc) { cerr << "mosquitto_disconnect: " << error_to_string(rc) << endl; } } void subscribe(int *mid, const string &topic, int qos) { int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); check("mosquitto_subscribe", rc); } inline string error_to_string(int rc) { if (rc == MOSQ_ERR_ERRNO) { return string(strerror(errno)); } return string(mosquitto_strerror(rc)); } private: void poll(threaded_tag) { } void poll(polling_tag) { int rc = mosquitto_loop(mosquitto, 100, 1); check("mosquitto_loop", rc); } public: void poll() { poll(p_tag); } // ------------------------------------------- // Callbacks // ------------------------------------------- protected: virtual void on_connect(int rc) { } virtual void on_disconnect(int rc) { } virtual void on_publish(int mid) { } virtual void on_message(const struct mosquitto_message *message) { } virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { } virtual void on_unsubscribe(int mid) { } virtual void on_log(int level, const char *str) { } private: static void on_connect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_connect(rc); } static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_disconnect(rc); } static void on_publish_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_publish(rc); } static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { static_cast(self)->on_message(message); } static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { static_cast(self)->on_subscribe(mid, qos_count, granted_qos); } static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { static_cast(self)->on_unsubscribe(mid); } static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { static_cast(self)->on_log(level, str); } }; } } #endif