From a0d83abee95e0ba7fcb882c970a76576cd3afb81 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 21 Jul 2015 11:26:42 +0200 Subject: Fixing two bugs in mqtt-publish: o The last character after EOF was parsed twice. o Create a separate counter for unacked messages instead of checking want_write. --- apps/mqtt-publish.cpp | 17 +++++++++++++---- sensor/include/trygvis/sensor/io.h | 6 ++++++ sensor/main/io.cpp | 1 - 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 63ce998..77b0c5a 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -33,7 +32,8 @@ public: topic_name(topic_name), keep_alive(keep_alive), connected(false), - should_reconnect(false) { + should_reconnect(false), + unacked_messages_(0){ } ~MqttSampleOutputStream() { @@ -125,6 +125,7 @@ public: LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); cv.notify_all(); + unacked_messages_--; } void wait() { @@ -132,6 +133,10 @@ public: cv.wait(lk); } + int unacked_messages() { + return unacked_messages_; + } + void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -153,6 +158,7 @@ public: if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); } else { + unacked_messages_++; LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); } } @@ -167,6 +173,7 @@ public: private: atomic_bool connected, should_reconnect; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + atomic_int unacked_messages_; condition_variable cv; mutex cv_mutex; }; @@ -245,11 +252,13 @@ public: char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); - auto buf = boost::asio::buffer(data, 1); + auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } - while (output->want_write()) { + input->finish(); + + while (output->unacked_messages()) { output->wait(); } diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index dd1460b..71b0b84 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -208,6 +208,12 @@ class SampleStreamParser { public: virtual int process(mutable_buffers_1 &buffer) = 0; + /** + * Tells the parser that the data that has been written so far either "is it" (there is no more input) or that + * it should just process what it has so far. It is ok to call process() after again after this method. + * + * TODO: should probably be renamed to "process()", "flush()" or "done()". + */ virtual int finish() = 0; virtual sample_format_type type() { diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 6c384ff..74bc796 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -71,7 +71,6 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { underlying->write(sample); } - AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { -- cgit v1.2.3