diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-21 11:26:42 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-21 11:26:42 +0200 |
commit | a0d83abee95e0ba7fcb882c970a76576cd3afb81 (patch) | |
tree | 06e3b53e32ce4eee61f6a2bd9c4dbaed6424323c /apps | |
parent | 5dbc69fb16c71c02859ec8665c55a1e2b068ddd7 (diff) | |
download | ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.gz ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.bz2 ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.xz ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.zip |
Fixing two bugs in mqtt-publish:
o The last character after EOF was parsed twice.
o Create a separate counter for unacked messages instead of checking want_write.
Diffstat (limited to 'apps')
-rw-r--r-- | apps/mqtt-publish.cpp | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 63ce998..77b0c5a 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -3,7 +3,6 @@ #include <iostream> #include <fstream> #include <iomanip> -#include <chrono> #include <boost/uuid/uuid_io.hpp> #include <thread> #include <mosquittopp.h> @@ -33,7 +32,8 @@ public: topic_name(topic_name), keep_alive(keep_alive), connected(false), - should_reconnect(false) { + should_reconnect(false), + unacked_messages_(0){ } ~MqttSampleOutputStream() { @@ -125,6 +125,7 @@ public: LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); cv.notify_all(); + unacked_messages_--; } void wait() { @@ -132,6 +133,10 @@ public: cv.wait(lk); } + int unacked_messages() { + return unacked_messages_; + } + void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -153,6 +158,7 @@ public: if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); } else { + unacked_messages_++; LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); } } @@ -167,6 +173,7 @@ public: private: atomic_bool connected, should_reconnect; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + atomic_int unacked_messages_; condition_variable cv; mutex cv_mutex; }; @@ -245,11 +252,13 @@ public: char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); - auto buf = boost::asio::buffer(data, 1); + auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } - while (output->want_write()) { + input->finish(); + + while (output->unacked_messages()) { output->wait(); } |