diff options
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r-- | raw-mqtt-consumer.cpp | 66 |
1 files changed, 13 insertions, 53 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp index 3cc43be..56b620e 100644 --- a/raw-mqtt-consumer.cpp +++ b/raw-mqtt-consumer.cpp @@ -19,46 +19,28 @@ static bool should_run; static string mqtt_host; static int mqtt_port; static string mqtt_topic; +static string mqtt_client_id; +static bool mqtt_clean_session; static string keyspace_name = "soil_moisture"; static unique_ptr<cassandra_session> current_cassandra_session; -class raw_mqtt_client /*: private mosqpp::mosquittopp*/ { +class raw_mqtt_client : public mqtt_client<mqtt_client_personality::polling> { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; - raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) { - cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; - - mosquitto = mosquitto_new(nullptr, true, this); - if(!mosquitto) { - throw runtime_error("Could not initialize mosquitto instance"); - } - mosquitto_connect_callback_set(mosquitto, on_connect_cb); - mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); - mosquitto_publish_callback_set(mosquitto, on_publish_cb); - mosquitto_message_callback_set(mosquitto, on_message_cb); - mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); - mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); - mosquitto_log_callback_set(mosquitto, on_log_cb); - - mosquitto_loop_start(mosquitto); - mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10); + raw_mqtt_client(callback_t on_message_) : mqtt_client(mqtt_host, mqtt_port, 10, mqtt_client_id, mqtt_clean_session), + on_message_(on_message_) { + cout << "Connecting to MQTT broker at " << mqtt_host << ":" << mqtt_port << endl; + connect(); } ~raw_mqtt_client() { - mosquitto_loop_stop(mosquitto, true); - mosquitto_disconnect(mosquitto); } private: callback_t on_message_; bool subscribed = false; - struct mosquitto *mosquitto; - - static void on_connect_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_connect(rc); - } void on_connect(int rc) { cout << "Connected to MQTT broker, rc=" << rc << endl; @@ -67,55 +49,31 @@ private: if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; - mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos); + subscribe(nullptr, mqtt_topic.c_str(), qos); } } - static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_disconnect(rc); - } - void on_disconnect(int rc) { subscribed = false; - cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl; - } - - static void on_publish_cb(struct mosquitto *m, void *self, int rc) { - static_cast<raw_mqtt_client *>(self)->on_publish(rc); + cout << "Oops, disconnected: " << error_to_string(rc) << endl; } void on_publish(int mid) { } - static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { - static_cast<raw_mqtt_client *>(self)->on_message(message); - } - void on_message(const struct mosquitto_message *message) { on_message_(message); } - static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { - static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos); - } - void on_subscribe(int mid, int qos_count, const int *granted_qos) { cout << "Subscribed" << endl; } - static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { - static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid); - } - void on_unsubscribe(int mid) { cout << "Oops, unsubscribed" << endl; } - static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { - static_cast<raw_mqtt_client *>(self)->on_log(level, str); - } - void on_log(int level, const char *str) { cout << "MQTT: " << level << ":" << str << endl; } @@ -209,6 +167,8 @@ int main(int argc, const char **argv) { all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required()); + all.add_options()("mqtt-client-id", po::value<>(&mqtt_client_id)); + all.add_options()("mqtt-clean-session", po::value<>(&mqtt_clean_session)); po::variables_map vm; try { @@ -256,14 +216,14 @@ int main(int argc, const char **argv) { cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); - cout << "Connecting to MQTT broker" << endl; raw_mqtt_client mqtt_client(on_message); should_run = true; while (should_run) { cout << "sleeping.." << endl; - std::this_thread::sleep_for(60s); + mqtt_client.poll(); + std::this_thread::sleep_for(10s); } current_cassandra_session.release(); |