From e9847a2bc0781cb578dc9a0b9c469ecaf48fe584 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 8 Aug 2015 23:03:12 +0200 Subject: o Adding a custom mosquitto c++ client. --- mqtt_support.h | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 2 deletions(-) (limited to 'mqtt_support.h') diff --git a/mqtt_support.h b/mqtt_support.h index 2ed5134..6b000e9 100644 --- a/mqtt_support.h +++ b/mqtt_support.h @@ -1,13 +1,17 @@ #ifndef TRYGVIS_MQTT_SUPPORT_H #define TRYGVIS_MQTT_SUPPORT_H -#include "mosquittopp.h" +#include +#include +#include +#include +#include #include "mosquitto.h" namespace trygvis { namespace mqtt_support { -using namespace mosqpp; +using namespace std; class mqtt_lib { public: @@ -20,6 +24,204 @@ public: } }; +class mqtt_error : public std::runtime_error { + +public: + const int error; + + mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) { + } +}; + +enum mqtt_client_personality { + threaded, + polling +}; + +extern mutex mqtt_client_mutex_; +extern int mqtt_client_instance_count; + +template +class mqtt_client { + template + struct personality_tag { + }; + + typedef personality_tag threaded_tag; + typedef personality_tag polling_tag; + const personality_tag p_tag{}; + + struct mosquitto *mosquitto; + + const string host; + const int port; + const int keep_alive; + + void check(const string &function, int rc) { + if (rc != MOSQ_ERR_SUCCESS) { + throw mqtt_error(function + ": " + error_to_string(rc), rc); + } + } + +public: + mqtt_client(const string &host, const int port, const int keep_alive, const string &id, const bool clean_session) : + host(host), port(port), keep_alive(keep_alive) { + lock_guard lock(mqtt_client_mutex_); + + if (mqtt_client_instance_count++ == 0) { + mosquitto_lib_init(); + } + + mosquitto = mosquitto_new(id.size() ? id.c_str() : nullptr, clean_session, this); + if (!mosquitto) { + string err = strerror(errno); + throw runtime_error("Could not initialize mosquitto instance: " + err); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + post_construct(p_tag); + } + +private: + void post_construct(threaded_tag) { + cout << "mosquitto_loop_start" << endl; + int rc = mosquitto_loop_start(mosquitto); + check("mosquitto_loop_start", rc); + } + + void post_construct(polling_tag) { + } + +public: + + virtual ~mqtt_client() { + lock_guard lock(mqtt_client_mutex_); + + int rc = mosquitto_loop_stop(mosquitto, true); + if (!rc) { + cerr << "mosquitto_loop_stop: " << error_to_string(rc) << endl; + } + disconnect(); + + if (--mqtt_client_instance_count == 0) { + mosquitto_lib_init(); + } + } + + void connect() { + connect(p_tag); + } + +private: + void connect(threaded_tag) { + cout << "mosquitto_connect_async" << endl; + int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive); + check("mosquitto_connect_async", rc); + } + + void connect(polling_tag) { + cout << "mosquitto_connect" << endl; + int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive); + check("mosquitto_connect", rc); + } + +public: + void disconnect() { + int rc = mosquitto_disconnect(mosquitto); + if (!rc) { + cerr << "mosquitto_disconnect: " << error_to_string(rc) << endl; + } + } + + void subscribe(int *mid, const string &topic, int qos) { + int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos); + check("mosquitto_subscribe", rc); + } + + inline + string error_to_string(int rc) { + if (rc == MOSQ_ERR_ERRNO) { + return string(strerror(errno)); + } + return string(mosquitto_strerror(rc)); + } + +private: + void poll(threaded_tag) { + } + + void poll(polling_tag) { + int rc = mosquitto_loop(mosquitto, 100, 1); + check("mosquitto_loop", rc); + } + +public: + void poll() { + poll(p_tag); + } + + // ------------------------------------------- + // Callbacks + // ------------------------------------------- + +protected: + virtual void on_connect(int rc) { + } + + virtual void on_disconnect(int rc) { + } + + virtual void on_publish(int mid) { + } + + virtual void on_message(const struct mosquitto_message *message) { + } + + virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) { + } + + virtual void on_unsubscribe(int mid) { + } + + virtual void on_log(int level, const char *str) { + } + +private: + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_connect(rc); + } + + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_disconnect(rc); + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast(self)->on_publish(rc); + } + + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast(self)->on_message(message); + } + + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast(self)->on_subscribe(mid, qos_count, granted_qos); + } + + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast(self)->on_unsubscribe(mid); + } + + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast(self)->on_log(level, str); + } +}; + } } -- cgit v1.2.3