aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-21 11:26:42 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-21 11:26:42 +0200
commita0d83abee95e0ba7fcb882c970a76576cd3afb81 (patch)
tree06e3b53e32ce4eee61f6a2bd9c4dbaed6424323c
parent5dbc69fb16c71c02859ec8665c55a1e2b068ddd7 (diff)
downloadble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.gz
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.bz2
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.tar.xz
ble-toys-a0d83abee95e0ba7fcb882c970a76576cd3afb81.zip
Fixing two bugs in mqtt-publish:
o The last character after EOF was parsed twice. o Create a separate counter for unacked messages instead of checking want_write.
-rw-r--r--apps/mqtt-publish.cpp17
-rw-r--r--sensor/include/trygvis/sensor/io.h6
-rw-r--r--sensor/main/io.cpp1
3 files changed, 19 insertions, 5 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 63ce998..77b0c5a 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -3,7 +3,6 @@
#include <iostream>
#include <fstream>
#include <iomanip>
-#include <chrono>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
#include <mosquittopp.h>
@@ -33,7 +32,8 @@ public:
topic_name(topic_name),
keep_alive(keep_alive),
connected(false),
- should_reconnect(false) {
+ should_reconnect(false),
+ unacked_messages_(0){
}
~MqttSampleOutputStream() {
@@ -125,6 +125,7 @@ public:
LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
cv.notify_all();
+ unacked_messages_--;
}
void wait() {
@@ -132,6 +133,10 @@ public:
cv.wait(lk);
}
+ int unacked_messages() {
+ return unacked_messages_;
+ }
+
void write(SampleRecord const &sample) override {
if (sample.empty()) {
return;
@@ -153,6 +158,7 @@ public:
if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
} else {
+ unacked_messages_++;
LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
}
}
@@ -167,6 +173,7 @@ public:
private:
atomic_bool connected, should_reconnect;
Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
+ atomic_int unacked_messages_;
condition_variable cv;
mutex cv_mutex;
};
@@ -245,11 +252,13 @@ public:
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
- auto buf = boost::asio::buffer(data, 1);
+ auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount());
input->process(buf);
}
- while (output->want_write()) {
+ input->finish();
+
+ while (output->unacked_messages()) {
output->wait();
}
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index dd1460b..71b0b84 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -208,6 +208,12 @@ class SampleStreamParser {
public:
virtual int process(mutable_buffers_1 &buffer) = 0;
+ /**
+ * Tells the parser that the data that has been written so far either "is it" (there is no more input) or that
+ * it should just process what it has so far. It is ok to call process() after again after this method.
+ *
+ * TODO: should probably be renamed to "process()", "flush()" or "done()".
+ */
virtual int finish() = 0;
virtual sample_format_type type() {
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 6c384ff..74bc796 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -71,7 +71,6 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
underlying->write(sample);
}
-
AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
KeyDictionary &dict,
const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {