#include #include #include #include #include #include #include #include #include "SoilMoisture.h" #include "trygvis/sensor.h" #include "trygvis/sensor/io.h" #include "apps.h" namespace trygvis { namespace apps { using namespace std; using namespace std::chrono; using namespace trygvis::apps; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace mosqpp; class MqttSampleOutputStream : public SampleOutputStream, public mosquittopp { public: MqttSampleOutputStream(const char *client_id, bool clean_session, string host, unsigned int port, string topic_name, unsigned int keep_alive) : SampleOutputStream(), mosquittopp(client_id, clean_session), host(host), port(port), topic_name(topic_name), keep_alive(keep_alive), connected(false), should_reconnect(false), unacked_messages_(0){ } ~MqttSampleOutputStream() { close(); }; void connect() { should_reconnect = true; int err; if ((err = mosquittopp::connect_async(host.c_str(), port, keep_alive))) { string msg = "Could not connect to MQTT broker " + host + ":" + std::to_string(port) + ": "; msg += mosqpp::strerror(err); throw sample_exception(msg); } err = loop_start(); if (err) { string msg = "Could not start network loop thread: "; msg += mosqpp::strerror(err); throw sample_exception(msg); } } void close() { if (connected) { LOG4CPLUS_INFO(logger, "Closing connection"); should_reconnect = false; int rc; if ((rc = disconnect()) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_DEBUG(logger, "Error when disconnecting from broker: " << mosquitto_strerror(rc)); } } loop_stop(false); } void on_connect(int rc) override { LOG4CPLUS_INFO(logger, "Connected"); if (rc == MOSQ_ERR_SUCCESS) { connected = true; } } void on_disconnect(int rc) override { if (!connected) { return; } if (should_reconnect) { LOG4CPLUS_INFO(logger, "Disconnected, reconnecting. Error: " << mosquitto_strerror(rc)); rc = reconnect_async(); if (rc != MOSQ_ERR_SUCCESS) { LOG4CPLUS_WARN(logger, "Error when reconnecting: " << mosquitto_strerror(rc)); } } else { LOG4CPLUS_INFO(logger, "Disconnected"); } connected = false; } void on_log(int level, const char *str) override { log4cplus::LogLevel l; if (level == MOSQ_LOG_INFO) { l = log4cplus::INFO_LOG_LEVEL; } else if (level == MOSQ_LOG_NOTICE) { l = log4cplus::INFO_LOG_LEVEL; } else if (level == MOSQ_LOG_WARNING) { l = log4cplus::WARN_LOG_LEVEL; } else if (level == MOSQ_LOG_ERR) { l = log4cplus::FATAL_LOG_LEVEL; } else if (level == MOSQ_LOG_DEBUG) { l = log4cplus::DEBUG_LOG_LEVEL; } else { l = log4cplus::DEBUG_LOG_LEVEL; } if ((logger).isEnabledFor(l)) { log4cplus::tostringstream _log4cplus_buf; _log4cplus_buf << "mosquitto: " << str; (logger).forcedLog(l, _log4cplus_buf.str()); } } void on_publish(int message_id) override { LOG4CPLUS_DEBUG(logger, "message ACKed, message id=" << message_id); cv.notify_all(); unacked_messages_--; } void wait() { std::unique_lock lk(cv_mutex); cv.wait(lk); } int unacked_messages() { return unacked_messages_; } void write(SampleRecord const &sample) override { if (sample.empty()) { return; } // make a string of the sample auto buf = make_shared(); KeyValueSampleOutputStream out(buf, sample.dict); out.write(sample); string s = buf->str(); cout << "sample: " << s; // Publish the message int message_id; const char *message = s.c_str(); int err; if ((err = publish(&message_id, topic_name.c_str(), (int) s.size(), message, qos, retain)) != MOSQ_ERR_SUCCESS) { LOG4CPLUS_INFO(logger, "Could not publish messaget to topic " << topic_name << ": " << port << ": " << mosqpp::strerror(err)); } else { unacked_messages_++; LOG4CPLUS_DEBUG(logger, "Published message, message id=" << message_id); } } const string host, topic_name; const unsigned int port; const unsigned int keep_alive; const int qos = 2; const bool retain = true; private: atomic_bool connected, should_reconnect; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("mqtt")); atomic_int unacked_messages_; condition_variable cv; mutex cv_mutex; }; class mosquitto_raii { public: mosquitto_raii() { mosquitto_lib_init(); } ~mosquitto_raii() { mosquitto_lib_init(); } }; class mqtt_publish : public app { public: mqtt_publish() : app("mqtt-publish") {} ~mqtt_publish() = default; KeyDictionary dict; sample_format_type format; string client_id, host, input_file, topic_name; unsigned int port, keep_alive; bool clean_session; void add_options(po::options_description_easy_init &options) override { auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE); auto input_file_opt = po::value(&input_file)->default_value("-"); auto topic_opt = po::value(&topic_name)->required(); auto keep_alive_opt = po::value(&keep_alive)->default_value(60); options("client-id", po::value(&client_id), "Client id"); options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state"); options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name"); options("port", po::value(&port)->default_value(1883), "MQTT broker port"); options("format", format_opt, "Formatting of message format"); options("input-file", input_file_opt, "Input file, '-' means stdin"); options("topic", topic_opt, "The topic to publish to"); options("keep-alive", keep_alive_opt, "How often the broker should send PING messages"); } int main(app_execution &execution) override { mosquitto_raii mosquitto_raii; auto desc = execution.desc; auto vm = execution.vm; try { istream *inputStream; if (input_file == "-") { inputStream = &cin; } else { inputStream = new ifstream(input_file); if (inputStream->fail()) { cerr << "Unable to open input file " << input_file << endl; return EXIT_FAILURE; } } const char *client_id_ = nullptr; if (!vm["client-id"].empty()) { client_id_ = client_id.c_str(); } else { clean_session = true; } auto output = make_shared(client_id_, clean_session, host, port, topic_name, keep_alive); auto input = make_shared(output, dict); output->connect(); char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } input->finish(); while (output->unacked_messages()) { output->wait(); } output->close(); return EXIT_SUCCESS; } catch (std::runtime_error ex) { cout << "std::runtime_error: " << ex.what() << endl; return EXIT_FAILURE; } catch (std::exception ex) { cout << "std::exception: " << ex.what() << endl; return EXIT_FAILURE; } } }; } } int main(int argc, const char *argv[]) { using namespace trygvis::apps; return real_main(new mqtt_publish(), argc, argv); }