aboutsummaryrefslogtreecommitdiff
path: root/apps/mqtt-publish.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mqtt-publish.cpp')
-rw-r--r--apps/mqtt-publish.cpp275
1 files changed, 275 insertions, 0 deletions
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
new file mode 100644
index 0000000..63ce998
--- /dev/null
+++ b/apps/mqtt-publish.cpp
@@ -0,0 +1,275 @@
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <fstream>
+#include <iomanip>
+#include <chrono>
+#include <boost/uuid/uuid_io.hpp>
+#include <thread>
+#include <mosquittopp.h>
+#include "SoilMoisture.h"
+#include "trygvis/sensor.h"
+#include "trygvis/sensor/io.h"
+#include "apps.h"
+
+namespace trygvis {
+namespace apps {
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::apps;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace mosqpp;
+
+class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp {
+public:
+ MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name,
+ unsigned int keep_alive)
+ : SampleOutputStream(),
+ mosquittopp(client_id, clean_session),
+ host(host),
+ port(port),
+ topic_name(topic_name),
+ keep_alive(keep_alive),
+ connected(false),
+ should_reconnect(false) {
+ }
+
+ ~MqttSampleOutputStream() {
+ close();
+ };
+
+ void connect() {
+ should_reconnect = true;
+ int err;
+ if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) {
+ string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": ";
+ msg += mosqpp::strerror(err);
+ throw sample_exception(msg);
+ }
+
+ err = loop_start();
+ if (err) {
+ string msg = "Could not start network loop thread: ";
+ msg += mosqpp::strerror(err);
+ throw sample_exception(msg);
+ }
+ }
+
+ void close() {
+ if (connected) {
+ LOG4CPLUS_INFO(logger, "Closing connection");
+
+ should_reconnect = false;
+
+ int rc;
+ if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc));
+ }
+ }
+
+ loop_stop(false);
+ }
+
+ void on_connect(int rc) override {
+ LOG4CPLUS_INFO(logger, "Connected");
+ if (rc == MOSQ_ERR_SUCCESS) {
+ connected = true;
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ if (!connected) {
+ return;
+ }
+
+ if (should_reconnect) {
+ LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc));
+ rc = reconnect_async();
+ if (rc != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc));
+ }
+ } else {
+ LOG4CPLUS_INFO(logger, "Disconnected");
+ }
+
+ connected = false;
+ }
+
+ void on_log(int level, const char *str) override {
+ log4cplus::LogLevel l;
+
+ if (level == MOSQ_LOG_INFO) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_NOTICE) {
+ l = log4cplus::INFO_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_WARNING) {
+ l = log4cplus::WARN_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_ERR) {
+ l = log4cplus::FATAL_LOG_LEVEL;
+ } else if (level == MOSQ_LOG_DEBUG) {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ } else {
+ l = log4cplus::DEBUG_LOG_LEVEL;
+ }
+
+ if ((logger).isEnabledFor(l)) {
+ log4cplus::tostringstream _log4cplus_buf;
+ _log4cplus_buf << "mosquitto: " << str;
+ (logger).forcedLog(l, _log4cplus_buf.str());
+ }
+ }
+
+ void on_publish(int message_id) override {
+ LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id);
+
+ cv.notify_all();
+ }
+
+ void wait() {
+ std::unique_lock<std::mutex> lk(cv_mutex);
+ cv.wait(lk);
+ }
+
+ void write(SampleRecord const &sample) override {
+ if (sample.empty()) {
+ return;
+ }
+
+ // make a string of the sample
+ auto buf = make_shared<stringstream>();
+ KeyValueSampleOutputStream out(buf, sample.dict);
+ out.write(sample);
+ string s = buf->str();
+
+ cout << "sample: " << s;
+
+ // Publish the message
+ int message_id;
+ const char *message = s.c_str();
+
+ int err;
+ if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) {
+ LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err));
+ } else {
+ LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id);
+ }
+ }
+
+ const string host, topic_name;
+ const unsigned int port;
+ const unsigned int keep_alive;
+
+ const int qos = 2;
+ const bool retain = true;
+
+private:
+ atomic_bool connected, should_reconnect;
+ Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt"));
+ condition_variable cv;
+ mutex cv_mutex;
+};
+
+class mosquitto_raii {
+public:
+ mosquitto_raii() {
+ mosquitto_lib_init();
+ }
+
+ ~mosquitto_raii() {
+ mosquitto_lib_init();
+ }
+};
+
+class mqtt_publish : public app {
+public:
+ mqtt_publish() : app("mqtt-publish") {}
+
+ ~mqtt_publish() = default;
+
+ KeyDictionary dict;
+ sample_format_type format;
+ string client_id, host, input_file, topic_name;
+ unsigned int port, keep_alive;
+ bool clean_session;
+
+ void add_options(po::options_description_easy_init &options) override {
+ auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE);
+ auto input_file_opt = po::value(&input_file)->default_value("-");
+ auto topic_opt = po::value(&topic_name)->required();
+ auto keep_alive_opt = po::value(&keep_alive)->default_value(60);
+
+ options("client-id", po::value(&client_id), "Client id");
+ options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state");
+ options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name");
+ options("port", po::value(&port)->default_value(1883), "MQTT broker port");
+ options("format", format_opt, "Formatting of message format");
+ options("input-file", input_file_opt, "Input file, '-' means stdin");
+ options("topic", topic_opt, "The topic to publish to");
+ options("keep-alive", keep_alive_opt, "How often the broker should send PING messages");
+ }
+
+ int main(app_execution &execution) override {
+ mosquitto_raii mosquitto_raii;
+
+ auto desc = execution.desc;
+ auto vm = execution.vm;
+
+ try {
+ istream *inputStream;
+ if (input_file == "-") {
+ inputStream = &cin;
+ } else {
+ inputStream = new ifstream(input_file);
+ if (inputStream->fail()) {
+ cerr << "Unable to open input file " << input_file << endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ const char *client_id_ = nullptr;
+
+ if (!vm["client-id"].empty()) {
+ client_id_ = client_id.c_str();
+ } else {
+ clean_session = true;
+ }
+
+ auto output =
+ make_shared<MqttSampleOutputStream>(client_id_, clean_session, host, port, topic_name, keep_alive);
+ auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
+
+ output->connect();
+
+ char data[100];
+ while (!inputStream->eof()) {
+ inputStream->get(data[0]);
+ auto buf = boost::asio::buffer(data, 1);
+ input->process(buf);
+ }
+
+ while (output->want_write()) {
+ output->wait();
+ }
+
+ output->close();
+
+ return EXIT_SUCCESS;
+ } catch (std::runtime_error ex) {
+ cout << "std::runtime_error: " << ex.what() << endl;
+ return EXIT_FAILURE;
+ } catch (std::exception ex) {
+ cout << "std::exception: " << ex.what() << endl;
+ return EXIT_FAILURE;
+ }
+ }
+};
+}
+}
+
+int main(int argc, const char *argv[]) {
+ using namespace trygvis::apps;
+
+ return real_main(new mqtt_publish(), argc, argv);
+}