aboutsummaryrefslogtreecommitdiff
path: root/apps/mqtt-publish.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mqtt-publish.cpp')
-rw-r--r--apps/mqtt-publish.cpp183
1 files changed, 31 insertions, 152 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 77b0c5a..25e8ae9 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -5,7 +5,8 @@
#include <iomanip>
#include <boost/uuid/uuid_io.hpp>
#include <thread>
-#include <mosquittopp.h>
+#include <log4cplus/loggingmacros.h>
+#include "mqtt_support.h"
#include "SoilMoisture.h"
#include "trygvis/sensor.h"
#include "trygvis/sensor/io.h"
@@ -19,124 +20,23 @@ using namespace std::chrono;
using namespace trygvis::apps;
using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
-using namespace mosqpp;
+using namespace trygvis::mqtt_support;
-class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp {
+class MqttSampleOutputStream : public SampleOutputStream {
public:
- MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name,
+ MqttSampleOutputStream(const o<string> &client_id, bool clean_session, string host, unsigned int port,
+ string topic_name,
unsigned int keep_alive)
- : SampleOutputStream(),
- mosquittopp(client_id, clean_session),
- host(host),
- port(port),
- topic_name(topic_name),
- keep_alive(keep_alive),
- connected(false),
- should_reconnect(false),
- unacked_messages_(0){
+ : SampleOutputStream(),
+ client(host, port, keep_alive, client_id, clean_session),
+ topic_name(topic_name) {
+ client.connect();
}
~MqttSampleOutputStream() {
- close();
+ client.disconnect();
};
- void connect() {
- should_reconnect = true;
- int err;
- if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) {
- string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": ";
- msg += mosqpp::strerror(err);
- throw sample_exception(msg);
- }
-
- err = loop_start();
- if (err) {
- string msg = "Could not start network loop thread: ";
- msg += mosqpp::strerror(err);
- throw sample_exception(msg);
- }
- }
-
- void close() {
- if (connected) {
- LOG4CPLUS_INFO(logger, "Closing connection");
-
- should_reconnect = false;
-
- int rc;
- if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc));
- }
- }
-
- loop_stop(false);
- }
-
- void on_connect(int rc) override {
- LOG4CPLUS_INFO(logger, "Connected");
- if (rc == MOSQ_ERR_SUCCESS) {
- connected = true;
- }
- }
-
- void on_disconnect(int rc) override {
- if (!connected) {
- return;
- }
-
- if (should_reconnect) {
- LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc));
- rc = reconnect_async();
- if (rc != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc));
- }
- } else {
- LOG4CPLUS_INFO(logger, "Disconnected");
- }
-
- connected = false;
- }
-
- void on_log(int level, const char *str) override {
- log4cplus::LogLevel l;
-
- if (level == MOSQ_LOG_INFO) {
- l = log4cplus::INFO_LOG_LEVEL;
- } else if (level == MOSQ_LOG_NOTICE) {
- l = log4cplus::INFO_LOG_LEVEL;
- } else if (level == MOSQ_LOG_WARNING) {
- l = log4cplus::WARN_LOG_LEVEL;
- } else if (level == MOSQ_LOG_ERR) {
- l = log4cplus::FATAL_LOG_LEVEL;
- } else if (level == MOSQ_LOG_DEBUG) {
- l = log4cplus::DEBUG_LOG_LEVEL;
- } else {
- l = log4cplus::DEBUG_LOG_LEVEL;
- }
-
- if ((logger).isEnabledFor(l)) {
- log4cplus::tostringstream _log4cplus_buf;
- _log4cplus_buf << "mosquitto: " << str;
- (logger).forcedLog(l, _log4cplus_buf.str());
- }
- }
-
- void on_publish(int message_id) override {
- LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
-
- cv.notify_all();
- unacked_messages_--;
- }
-
- void wait() {
- std::unique_lock<std::mutex> lk(cv_mutex);
- cv.wait(lk);
- }
-
- int unacked_messages() {
- return unacked_messages_;
- }
-
void write(SampleRecord const &sample) override {
if (sample.empty()) {
return;
@@ -154,44 +54,19 @@ public:
int message_id;
const char *message = s.c_str();
- int err;
- if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
- LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
- } else {
- unacked_messages_++;
- LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
- }
+ client.publish(&message_id, topic_name, qos, retain, static_cast<int>(s.length()), s.c_str());
}
- const string host, topic_name;
- const unsigned int port;
- const unsigned int keep_alive;
+ const string topic_name;
+ mqtt_client<mqtt_client_personality::threaded> client;
const int qos = 2;
const bool retain = true;
-
-private:
- atomic_bool connected, should_reconnect;
- Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
- atomic_int unacked_messages_;
- condition_variable cv;
- mutex cv_mutex;
-};
-
-class mosquitto_raii {
-public:
- mosquitto_raii() {
- mosquitto_lib_init();
- }
-
- ~mosquitto_raii() {
- mosquitto_lib_init();
- }
};
class mqtt_publish : public app {
public:
- mqtt_publish() : app("mqtt-publish") {}
+ mqtt_publish() : app("mqtt-publish") { }
~mqtt_publish() = default;
@@ -218,7 +93,7 @@ public:
}
int main(app_execution &execution) override {
- mosquitto_raii mosquitto_raii;
+ mqtt_lib mqtt_lib;
auto desc = execution.desc;
auto vm = execution.vm;
@@ -235,41 +110,45 @@ public:
}
}
- const char *client_id_ = nullptr;
+ o <string> client_id_;
if (!vm["client-id"].empty()) {
- client_id_ = client_id.c_str();
+ client_id_ = client_id;
} else {
clean_session = true;
}
- auto output =
- make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, keep_alive);
+ auto output = make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name,
+ keep_alive);
auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
- output->connect();
+// while (!output->client.connected()) {
+// cout << "Waiting for connection" << endl;
+// output->client.wait();
+// }
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
+ cout << "got data: " << inputStream->gcount() << endl;
auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount());
input->process(buf);
}
input->finish();
- while (output->unacked_messages()) {
- output->wait();
+ while (output->client.unacked_messages()) {
+ cout << "finishing.. unacked messages: " << output->client.unacked_messages() << endl;
+ output->client.wait();
}
- output->close();
-
return EXIT_SUCCESS;
- } catch (std::runtime_error ex) {
+ } catch (std::runtime_error &ex) {
cout << "std::runtime_error: " << ex.what() << endl;
return EXIT_FAILURE;
- } catch (std::exception ex) {
+ } catch (std::exception &ex) {
cout << "std::exception: " << ex.what() << endl;
+ cout << "typeid: " << typeid(ex).name() << endl;
return EXIT_FAILURE;
}
}