aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-21 11:26:42 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-21 11:26:42 +0200
commita0d83abee95e0ba7fcb882c970a76576cd3afb81 (patch)
tree06e3b53e32ce4eee61f6a2bd9c4dbaed6424323c /apps
parent5dbc69fb16c71c02859ec8665c55a1e2b068ddd7 (diff)
downloadble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.gz
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.bz2
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.xz
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.zip
Fixing two bugs in mqtt-publish:
o The last character after EOF was parsed twice. o Create a separate counter for unacked messages instead of checking want_write.
Diffstat (limited to 'apps')
-rw-r--r--apps/mqtt-publish.cpp17
1 files changed, 13 insertions, 4 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 63ce998..77b0c5a 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -3,7 +3,6 @@
#include <iostream>
#include <fstream>
#include <iomanip>
-#include <chrono>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
#include <mosquittopp.h>
@@ -33,7 +32,8 @@ public:
topic_name(topic_name),
keep_alive(keep_alive),
connected(false),
- should_reconnect(false) {
+ should_reconnect(false),
+ unacked_messages_(0){
}
~MqttSampleOutputStream() {
@@ -125,6 +125,7 @@ public:
LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
cv.notify_all();
+ unacked_messages_--;
}
void wait() {
@@ -132,6 +133,10 @@ public:
cv.wait(lk);
}
+ int unacked_messages() {
+ return unacked_messages_;
+ }
+
void write(SampleRecord const &sample) override {
if (sample.empty()) {
return;
@@ -153,6 +158,7 @@ public:
if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
} else {
+ unacked_messages_++;
LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
}
}
@@ -167,6 +173,7 @@ public:
private:
atomic_bool connected, should_reconnect;
Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
+ atomic_int unacked_messages_;
condition_variable cv;
mutex cv_mutex;
};
@@ -245,11 +252,13 @@ public:
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
- auto buf = boost::asio::buffer(data, 1);
+ auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount());
input->process(buf);
}
- while (output->want_write()) {
+ input->finish();
+
+ while (output->unacked_messages()) {
output->wait();
}