From 5dbc69fb16c71c02859ec8665c55a1e2b068ddd7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 20 Jul 2015 22:33:39 +0200 Subject: o Adding an application to send samples over MQTT. o Improved CMake build script, better detection and error messages of headers/libraries. Conditionally adding the applications that can be compiled with the given set of found libraries. --- apps/mqtt-publish.cpp | 275 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 apps/mqtt-publish.cpp (limited to 'apps/mqtt-publish.cpp') diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp new file mode 100644 index 0000000..63ce998 --- /dev/null +++ b/apps/mqtt-publish.cpp @@ -0,0 +1,275 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "SoilMoisture.h" +#include "trygvis/sensor.h" +#include "trygvis/sensor/io.h" +#include "apps.h" + +namespace trygvis { +namespace apps { + +using namespace std; +using namespace std::chrono; +using namespace trygvis::apps; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace mosqpp; + +class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp { +public: + MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name, + unsigned int keep_alive) + : SampleOutputStream(), + mosquittopp(client_id, clean_session), + host(host), + port(port), + topic_name(topic_name), + keep_alive(keep_alive), + connected(false), + should_reconnect(false) { + } + + ~MqttSampleOutputStream() { + close(); + }; + + void connect() { + should_reconnect = true; + int err; + if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) { + string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": "; + msg += mosqpp::strerror(err); + throw sample_exception(msg); + } + + err = loop_start(); + if (err) { + string msg = "Could not start network loop thread: "; + msg += mosqpp::strerror(err); + throw sample_exception(msg); + } + } + + void close() { + if (connected) { + LOG4CPLUS_INFO(logger, "Closing connection"); + + should_reconnect = false; + + int rc; + if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc)); + } + } + + loop_stop(false); + } + + void on_connect(int rc) override { + LOG4CPLUS_INFO(logger, "Connected"); + if (rc == MOSQ_ERR_SUCCESS) { + connected = true; + } + } + + void on_disconnect(int rc) override { + if (!connected) { + return; + } + + if (should_reconnect) { + LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc)); + rc = reconnect_async(); + if (rc != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc)); + } + } else { + LOG4CPLUS_INFO(logger, "Disconnected"); + } + + connected = false; + } + + void on_log(int level, const char *str) override { + log4cplus::LogLevel l; + + if (level == MOSQ_LOG_INFO) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_NOTICE) { + l = log4cplus::INFO_LOG_LEVEL; + } else if (level == MOSQ_LOG_WARNING) { + l = log4cplus::WARN_LOG_LEVEL; + } else if (level == MOSQ_LOG_ERR) { + l = log4cplus::FATAL_LOG_LEVEL; + } else if (level == MOSQ_LOG_DEBUG) { + l = log4cplus::DEBUG_LOG_LEVEL; + } else { + l = log4cplus::DEBUG_LOG_LEVEL; + } + + if ((logger).isEnabledFor(l)) { + log4cplus::tostringstream _log4cplus_buf; + _log4cplus_buf << "mosquitto: " << str; + (logger).forcedLog(l, _log4cplus_buf.str()); + } + } + + void on_publish(int message_id) override { + LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); + + cv.notify_all(); + } + + void wait() { + std::unique_lock lk(cv_mutex); + cv.wait(lk); + } + + void write(SampleRecord const &sample) override { + if (sample.empty()) { + return; + } + + // make a string of the sample + auto buf = make_shared(); + KeyValueSampleOutputStream out(buf, sample.dict); + out.write(sample); + string s = buf->str(); + + cout << "sample: " << s; + + // Publish the message + int message_id; + const char *message = s.c_str(); + + int err; + if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { + LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); + } else { + LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); + } + } + + const string host, topic_name; + const unsigned int port; + const unsigned int keep_alive; + + const int qos = 2; + const bool retain = true; + +private: + atomic_bool connected, should_reconnect; + Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); + condition_variable cv; + mutex cv_mutex; +}; + +class mosquitto_raii { +public: + mosquitto_raii() { + mosquitto_lib_init(); + } + + ~mosquitto_raii() { + mosquitto_lib_init(); + } +}; + +class mqtt_publish : public app { +public: + mqtt_publish() : app("mqtt-publish") {} + + ~mqtt_publish() = default; + + KeyDictionary dict; + sample_format_type format; + string client_id, host, input_file, topic_name; + unsigned int port, keep_alive; + bool clean_session; + + void add_options(po::options_description_easy_init &options) override { + auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE); + auto input_file_opt = po::value(&input_file)->default_value("-"); + auto topic_opt = po::value(&topic_name)->required(); + auto keep_alive_opt = po::value(&keep_alive)->default_value(60); + + options("client-id", po::value(&client_id), "Client id"); + options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state"); + options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name"); + options("port", po::value(&port)->default_value(1883), "MQTT broker port"); + options("format", format_opt, "Formatting of message format"); + options("input-file", input_file_opt, "Input file, '-' means stdin"); + options("topic", topic_opt, "The topic to publish to"); + options("keep-alive", keep_alive_opt, "How often the broker should send PING messages"); + } + + int main(app_execution &execution) override { + mosquitto_raii mosquitto_raii; + + auto desc = execution.desc; + auto vm = execution.vm; + + try { + istream *inputStream; + if (input_file == "-") { + inputStream = &cin; + } else { + inputStream = new ifstream(input_file); + if (inputStream->fail()) { + cerr << "Unable to open input file " << input_file << endl; + return EXIT_FAILURE; + } + } + + const char *client_id_ = nullptr; + + if (!vm["client-id"].empty()) { + client_id_ = client_id.c_str(); + } else { + clean_session = true; + } + + auto output = + make_shared(client_id_, clean_session, host, port, topic_name, keep_alive); + auto input = make_shared(output, dict); + + output->connect(); + + char data[100]; + while (!inputStream->eof()) { + inputStream->get(data[0]); + auto buf = boost::asio::buffer(data, 1); + input->process(buf); + } + + while (output->want_write()) { + output->wait(); + } + + output->close(); + + return EXIT_SUCCESS; + } catch (std::runtime_error ex) { + cout << "std::runtime_error: " << ex.what() << endl; + return EXIT_FAILURE; + } catch (std::exception ex) { + cout << "std::exception: " << ex.what() << endl; + return EXIT_FAILURE; + } + } +}; +} +} + +int main(int argc, const char *argv[]) { + using namespace trygvis::apps; + + return real_main(new mqtt_publish(), argc, argv); +} -- cgit v1.2.3