aboutsummaryrefslogtreecommitdiff
path: root/raw-mqtt-consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r--raw-mqtt-consumer.cpp66
1 files changed, 13 insertions, 53 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
index 3cc43be..56b620e 100644
--- a/raw-mqtt-consumer.cpp
+++ b/raw-mqtt-consumer.cpp
@@ -19,46 +19,28 @@ static bool should_run;
static string mqtt_host;
static int mqtt_port;
static string mqtt_topic;
+static string mqtt_client_id;
+static bool mqtt_clean_session;
static string keyspace_name = "soil_moisture";
static unique_ptr<cassandra_session> current_cassandra_session;
-class raw_mqtt_client /*: private mosqpp::mosquittopp*/ {
+class raw_mqtt_client : public mqtt_client<mqtt_client_personality::polling> {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
- raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) {
- cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
-
- mosquitto = mosquitto_new(nullptr, true, this);
- if(!mosquitto) {
- throw runtime_error("Could not initialize mosquitto instance");
- }
- mosquitto_connect_callback_set(mosquitto, on_connect_cb);
- mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
- mosquitto_publish_callback_set(mosquitto, on_publish_cb);
- mosquitto_message_callback_set(mosquitto, on_message_cb);
- mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
- mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
- mosquitto_log_callback_set(mosquitto, on_log_cb);
-
- mosquitto_loop_start(mosquitto);
- mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10);
+ raw_mqtt_client(callback_t on_message_) : mqtt_client(mqtt_host, mqtt_port, 10, mqtt_client_id, mqtt_clean_session),
+ on_message_(on_message_) {
+ cout << "Connecting to MQTT broker at " << mqtt_host << ":" << mqtt_port << endl;
+ connect();
}
~raw_mqtt_client() {
- mosquitto_loop_stop(mosquitto, true);
- mosquitto_disconnect(mosquitto);
}
private:
callback_t on_message_;
bool subscribed = false;
- struct mosquitto *mosquitto;
-
- static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_connect(rc);
- }
void on_connect(int rc) {
cout << "Connected to MQTT broker, rc=" << rc << endl;
@@ -67,55 +49,31 @@ private:
if (!subscribed) {
subscribed = true;
cout << "Subscribing..." << endl;
- mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos);
+ subscribe(nullptr, mqtt_topic.c_str(), qos);
}
}
- static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_disconnect(rc);
- }
-
void on_disconnect(int rc) {
subscribed = false;
- cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl;
- }
-
- static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
- static_cast<raw_mqtt_client *>(self)->on_publish(rc);
+ cout << "Oops, disconnected: " << error_to_string(rc) << endl;
}
void on_publish(int mid) {
}
- static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
- static_cast<raw_mqtt_client *>(self)->on_message(message);
- }
-
void on_message(const struct mosquitto_message *message) {
on_message_(message);
}
- static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
- static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
- }
-
void on_subscribe(int mid, int qos_count, const int *granted_qos) {
cout << "Subscribed" << endl;
}
- static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
- static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid);
- }
-
void on_unsubscribe(int mid) {
cout << "Oops, unsubscribed" << endl;
}
- static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
- static_cast<raw_mqtt_client *>(self)->on_log(level, str);
- }
-
void on_log(int level, const char *str) {
cout << "MQTT: " << level << ":" << str << endl;
}
@@ -209,6 +167,8 @@ int main(int argc, const char **argv) {
all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required());
+ all.add_options()("mqtt-client-id", po::value<>(&mqtt_client_id));
+ all.add_options()("mqtt-clean-session", po::value<>(&mqtt_clean_session));
po::variables_map vm;
try {
@@ -256,14 +216,14 @@ int main(int argc, const char **argv) {
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
- cout << "Connecting to MQTT broker" << endl;
raw_mqtt_client mqtt_client(on_message);
should_run = true;
while (should_run) {
cout << "sleeping.." << endl;
- std::this_thread::sleep_for(60s);
+ mqtt_client.poll();
+ std::this_thread::sleep_for(10s);
}
current_cassandra_session.release();