aboutsummaryrefslogtreecommitdiff
path: root/mqtt_support.h
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt_support.h')
-rw-r--r--mqtt_support.h206
1 files changed, 204 insertions, 2 deletions
diff --git a/mqtt_support.h b/mqtt_support.h
index 2ed5134..6b000e9 100644
--- a/mqtt_support.h
+++ b/mqtt_support.h
@@ -1,13 +1,17 @@
#ifndef TRYGVIS_MQTT_SUPPORT_H
#define TRYGVIS_MQTT_SUPPORT_H
-#include "mosquittopp.h"
+#include <mutex>
+#include <string>
+#include <exception>
+#include <iostream>
+#include <cstring>
#include "mosquitto.h"
namespace trygvis {
namespace mqtt_support {
-using namespace mosqpp;
+using namespace std;
class mqtt_lib {
public:
@@ -20,6 +24,204 @@ public:
}
};
+class mqtt_error : public std::runtime_error {
+
+public:
+ const int error;
+
+ mqtt_error(const string &what, int rc) : runtime_error(what), error(rc) {
+ }
+};
+
+enum mqtt_client_personality {
+ threaded,
+ polling
+};
+
+extern mutex mqtt_client_mutex_;
+extern int mqtt_client_instance_count;
+
+template<mqtt_client_personality personality>
+class mqtt_client {
+ template<bool>
+ struct personality_tag {
+ };
+
+ typedef personality_tag<mqtt_client_personality::threaded> threaded_tag;
+ typedef personality_tag<mqtt_client_personality::polling> polling_tag;
+ const personality_tag<personality> p_tag{};
+
+ struct mosquitto *mosquitto;
+
+ const string host;
+ const int port;
+ const int keep_alive;
+
+ void check(const string &function, int rc) {
+ if (rc != MOSQ_ERR_SUCCESS) {
+ throw mqtt_error(function + ": " + error_to_string(rc), rc);
+ }
+ }
+
+public:
+ mqtt_client(const string &host, const int port, const int keep_alive, const string &id, const bool clean_session) :
+ host(host), port(port), keep_alive(keep_alive) {
+ lock_guard <mutex> lock(mqtt_client_mutex_);
+
+ if (mqtt_client_instance_count++ == 0) {
+ mosquitto_lib_init();
+ }
+
+ mosquitto = mosquitto_new(id.size() ? id.c_str() : nullptr, clean_session, this);
+ if (!mosquitto) {
+ string err = strerror(errno);
+ throw runtime_error("Could not initialize mosquitto instance: " + err);
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ post_construct(p_tag);
+ }
+
+private:
+ void post_construct(threaded_tag) {
+ cout << "mosquitto_loop_start" << endl;
+ int rc = mosquitto_loop_start(mosquitto);
+ check("mosquitto_loop_start", rc);
+ }
+
+ void post_construct(polling_tag) {
+ }
+
+public:
+
+ virtual ~mqtt_client() {
+ lock_guard <mutex> lock(mqtt_client_mutex_);
+
+ int rc = mosquitto_loop_stop(mosquitto, true);
+ if (!rc) {
+ cerr << "mosquitto_loop_stop: " << error_to_string(rc) << endl;
+ }
+ disconnect();
+
+ if (--mqtt_client_instance_count == 0) {
+ mosquitto_lib_init();
+ }
+ }
+
+ void connect() {
+ connect(p_tag);
+ }
+
+private:
+ void connect(threaded_tag) {
+ cout << "mosquitto_connect_async" << endl;
+ int rc = mosquitto_connect_async(mosquitto, host.c_str(), port, keep_alive);
+ check("mosquitto_connect_async", rc);
+ }
+
+ void connect(polling_tag) {
+ cout << "mosquitto_connect" << endl;
+ int rc = mosquitto_connect(mosquitto, host.c_str(), port, keep_alive);
+ check("mosquitto_connect", rc);
+ }
+
+public:
+ void disconnect() {
+ int rc = mosquitto_disconnect(mosquitto);
+ if (!rc) {
+ cerr << "mosquitto_disconnect: " << error_to_string(rc) << endl;
+ }
+ }
+
+ void subscribe(int *mid, const string &topic, int qos) {
+ int rc = mosquitto_subscribe(mosquitto, mid, topic.c_str(), qos);
+ check("mosquitto_subscribe", rc);
+ }
+
+ inline
+ string error_to_string(int rc) {
+ if (rc == MOSQ_ERR_ERRNO) {
+ return string(strerror(errno));
+ }
+ return string(mosquitto_strerror(rc));
+ }
+
+private:
+ void poll(threaded_tag) {
+ }
+
+ void poll(polling_tag) {
+ int rc = mosquitto_loop(mosquitto, 100, 1);
+ check("mosquitto_loop", rc);
+ }
+
+public:
+ void poll() {
+ poll(p_tag);
+ }
+
+ // -------------------------------------------
+ // Callbacks
+ // -------------------------------------------
+
+protected:
+ virtual void on_connect(int rc) {
+ }
+
+ virtual void on_disconnect(int rc) {
+ }
+
+ virtual void on_publish(int mid) {
+ }
+
+ virtual void on_message(const struct mosquitto_message *message) {
+ }
+
+ virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) {
+ }
+
+ virtual void on_unsubscribe(int mid) {
+ }
+
+ virtual void on_log(int level, const char *str) {
+ }
+
+private:
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_connect(rc);
+ }
+
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_disconnect(rc);
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<mqtt_client *>(self)->on_publish(rc);
+ }
+
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<mqtt_client *>(self)->on_message(message);
+ }
+
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
+ }
+
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<mqtt_client *>(self)->on_unsubscribe(mid);
+ }
+
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<mqtt_client *>(self)->on_log(level, str);
+ }
+};
+
}
}