From a0d83abee95e0ba7fcb882c970a76576cd3afb81 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 21 Jul 2015 11:26:42 +0200 Subject: Fixing two bugs in mqtt-publish: o The last character after EOF was parsed twice. o Create a separate counter for unacked messages instead of checking want_write. --- apps/mqtt-publish.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'apps') diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 63ce998..77b0c5a 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -33,7 +32,8 @@ public: topic_name(topic_name), keep_alive(keep_alive), connected(false), - should_reconnect(false) { + should_reconnect(false), + unacked_messages_(0){ } ~MqttSampleOutputStream() { @@ -125,6 +125,7 @@ public: LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); cv.notify_all(); + unacked_messages_--; } void wait() { @@ -132,6 +133,10 @@ public: cv.wait(lk); } + int unacked_messages() { + return unacked_messages_; + } + void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -153,6 +158,7 @@ public: if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); } else { + unacked_messages_++; LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); } } @@ -167,6 +173,7 @@ public: private: atomic_bool connected, should_reconnect; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + atomic_int unacked_messages_; condition_variable cv; mutex cv_mutex; }; @@ -245,11 +252,13 @@ public: char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); - auto buf = boost::asio::buffer(data, 1); + auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } - while (output->want_write()) { + input->finish(); + + while (output->unacked_messages()) { output->wait(); } -- cgit v1.2.3