diff options
-rw-r--r-- | apps/mqtt-publish.cpp | 17 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 6 | ||||
-rw-r--r-- | sensor/main/io.cpp | 1 |
3 files changed, 19 insertions, 5 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 63ce998..77b0c5a 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -3,7 +3,6 @@ #include <iostream> #include <fstream> #include <iomanip> -#include <chrono> #include <boost/uuid/uuid_io.hpp> #include <thread> #include <mosquittopp.h> @@ -33,7 +32,8 @@ public: topic_name(topic_name), keep_alive(keep_alive), connected(false), - should_reconnect(false) { + should_reconnect(false), + unacked_messages_(0){ } ~MqttSampleOutputStream() { @@ -125,6 +125,7 @@ public: LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); cv.notify_all(); + unacked_messages_--; } void wait() { @@ -132,6 +133,10 @@ public: cv.wait(lk); } + int unacked_messages() { + return unacked_messages_; + } + void write(SampleRecord const &sample) override { if (sample.empty()) { return; @@ -153,6 +158,7 @@ public: if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); } else { + unacked_messages_++; LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); } } @@ -167,6 +173,7 @@ public: private: atomic_bool connected, should_reconnect; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + atomic_int unacked_messages_; condition_variable cv; mutex cv_mutex; }; @@ -245,11 +252,13 @@ public: char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); - auto buf = boost::asio::buffer(data, 1); + auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } - while (output->want_write()) { + input->finish(); + + while (output->unacked_messages()) { output->wait(); } diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index dd1460b..71b0b84 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -208,6 +208,12 @@ class SampleStreamParser { public: virtual int process(mutable_buffers_1 &buffer) = 0; + /** + * Tells the parser that the data that has been written so far either "is it" (there is no more input) or that + * it should just process what it has so far. It is ok to call process() after again after this method. + * + * TODO: should probably be renamed to "process()", "flush()" or "done()". + */ virtual int finish() = 0; virtual sample_format_type type() { diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 6c384ff..74bc796 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -71,7 +71,6 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { underlying->write(sample); } - AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { |