aboutsummaryrefslogtreecommitdiff
path: root/apps/mqtt-publish.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mqtt-publish.cpp')
-rw-r--r--apps/mqtt-publish.cpp17
1 files changed, 13 insertions, 4 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 63ce998..77b0c5a 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -3,7 +3,6 @@
#include <iostream>
#include <fstream>
#include <iomanip>
-#include <chrono>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
#include <mosquittopp.h>
@@ -33,7 +32,8 @@ public:
topic_name(topic_name),
keep_alive(keep_alive),
connected(false),
- should_reconnect(false) {
+ should_reconnect(false),
+ unacked_messages_(0){
}
~MqttSampleOutputStream() {
@@ -125,6 +125,7 @@ public:
LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
cv.notify_all();
+ unacked_messages_--;
}
void wait() {
@@ -132,6 +133,10 @@ public:
cv.wait(lk);
}
+ int unacked_messages() {
+ return unacked_messages_;
+ }
+
void write(SampleRecord const &sample) override {
if (sample.empty()) {
return;
@@ -153,6 +158,7 @@ public:
if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
} else {
+ unacked_messages_++;
LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
}
}
@@ -167,6 +173,7 @@ public:
private:
atomic_bool connected, should_reconnect;
Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
+ atomic_int unacked_messages_;
condition_variable cv;
mutex cv_mutex;
};
@@ -245,11 +252,13 @@ public:
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
- auto buf = boost::asio::buffer(data, 1);
+ auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount());
input->process(buf);
}
- while (output->want_write()) {
+ input->finish();
+
+ while (output->unacked_messages()) {
output->wait();
}