From 8ded9e3d0bbc2d7cdc5b9f01b4fed9c8685caf82 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 14 Feb 2016 14:41:52 +0100 Subject: mqtt: Using mqtt_support utilities from the mqtt-cassandra bridge. --- apps/mqtt-publish.cpp | 183 +++++++++----------------------------------------- 1 file changed, 31 insertions(+), 152 deletions(-) (limited to 'apps/mqtt-publish.cpp') diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 77b0c5a..25e8ae9 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -5,7 +5,8 @@ #include #include #include -#include +#include +#include "mqtt_support.h" #include "SoilMoisture.h" #include "trygvis/sensor.h" #include "trygvis/sensor/io.h" @@ -19,124 +20,23 @@ using namespace std::chrono; using namespace trygvis::apps; using namespace trygvis::sensor; using namespace trygvis::sensor::io; -using namespace mosqpp; +using namespace trygvis::mqtt_support; -class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp { +class MqttSampleOutputStream : public SampleOutputStream { public: - MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name, + MqttSampleOutputStream(const o &client_id, bool clean_session, string host, unsigned int port, + string topic_name, unsigned int keep_alive) - : SampleOutputStream(), - mosquittopp(client_id, clean_session), - host(host), - port(port), - topic_name(topic_name), - keep_alive(keep_alive), - connected(false), - should_reconnect(false), - unacked_messages_(0){ + : SampleOutputStream(), + client(host, port, keep_alive, client_id, clean_session), + topic_name(topic_name) { + client.connect(); } ~MqttSampleOutputStream() { - close(); + client.disconnect(); }; - void connect() { - should_reconnect = true; - int err; - if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) { - string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": "; - msg += mosqpp::strerror(err); - throw sample_exception(msg); - } - - err = loop_start(); - if (err) { - string msg = "Could not start network loop thread: "; - msg += mosqpp::strerror(err); - throw sample_exception(msg); - } - } - - void close() { - if (connected) { - LOG4CPLUS_INFO(logger, "Closing connection"); - - should_reconnect = false; - - int rc; - if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc)); - } - } - - loop_stop(false); - } - - void on_connect(int rc) override { - LOG4CPLUS_INFO(logger, "Connected"); - if (rc == MOSQ_ERR_SUCCESS) { - connected = true; - } - } - - void on_disconnect(int rc) override { - if (!connected) { - return; - } - - if (should_reconnect) { - LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc)); - rc = reconnect_async(); - if (rc != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc)); - } - } else { - LOG4CPLUS_INFO(logger, "Disconnected"); - } - - connected = false; - } - - void on_log(int level, const char *str) override { - log4cplus::LogLevel l; - - if (level == MOSQ_LOG_INFO) { - l = log4cplus::INFO_LOG_LEVEL; - } else if (level == MOSQ_LOG_NOTICE) { - l = log4cplus::INFO_LOG_LEVEL; - } else if (level == MOSQ_LOG_WARNING) { - l = log4cplus::WARN_LOG_LEVEL; - } else if (level == MOSQ_LOG_ERR) { - l = log4cplus::FATAL_LOG_LEVEL; - } else if (level == MOSQ_LOG_DEBUG) { - l = log4cplus::DEBUG_LOG_LEVEL; - } else { - l = log4cplus::DEBUG_LOG_LEVEL; - } - - if ((logger).isEnabledFor(l)) { - log4cplus::tostringstream _log4cplus_buf; - _log4cplus_buf << "mosquitto: " << str; - (logger).forcedLog(l, _log4cplus_buf.str()); - } - } - - void on_publish(int message_id) override { - LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); - - cv.notify_all(); - unacked_messages_--; - } - - void wait() { - std::unique_lock lk(cv_mutex); - cv.wait(lk); - } - - int unacked_messages() { - return unacked_messages_; - } - void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -154,44 +54,19 @@ public: int message_id; const char *message = s.c_str(); - int err; - if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { - LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); - } else { - unacked_messages_++; - LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); - } + client.publish(&message_id, topic_name, qos, retain, static_cast(s.length()), s.c_str()); } - const string host, topic_name; - const unsigned int port; - const unsigned int keep_alive; + const string topic_name; + mqtt_client client; const int qos = 2; const bool retain = true; - -private: - atomic_bool connected, should_reconnect; - Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); - atomic_int unacked_messages_; - condition_variable cv; - mutex cv_mutex; -}; - -class mosquitto_raii { -public: - mosquitto_raii() { - mosquitto_lib_init(); - } - - ~mosquitto_raii() { - mosquitto_lib_init(); - } }; class mqtt_publish : public app { public: - mqtt_publish() : app("mqtt-publish") {} + mqtt_publish() : app("mqtt-publish") { } ~mqtt_publish() = default; @@ -218,7 +93,7 @@ public: } int main(app_execution &execution) override { - mosquitto_raii mosquitto_raii; + mqtt_lib mqtt_lib; auto desc = execution.desc; auto vm = execution.vm; @@ -235,41 +110,45 @@ public: } } - const char *client_id_ = nullptr; + o client_id_; if (!vm["client-id"].empty()) { - client_id_ = client_id.c_str(); + client_id_ = client_id; } else { clean_session = true; } - auto output = - make_shared(client_id_, clean_session, host, port, topic_name, keep_alive); + auto output = make_shared(client_id_, clean_session, host, port, topic_name, + keep_alive); auto input = make_shared(output, dict); - output->connect(); +// while (!output->client.connected()) { +// cout << "Waiting for connection" << endl; +// output->client.wait(); +// } char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); + cout << "got data: " << inputStream->gcount() << endl; auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } input->finish(); - while (output->unacked_messages()) { - output->wait(); + while (output->client.unacked_messages()) { + cout << "finishing.. unacked messages: " << output->client.unacked_messages() << endl; + output->client.wait(); } - output->close(); - return EXIT_SUCCESS; - } catch (std::runtime_error ex) { + } catch (std::runtime_error &ex) { cout << "std::runtime_error: " << ex.what() << endl; return EXIT_FAILURE; - } catch (std::exception ex) { + } catch (std::exception &ex) { cout << "std::exception: " << ex.what() << endl; + cout << "typeid: " << typeid(ex).name() << endl; return EXIT_FAILURE; } } -- cgit v1.2.3