#include #include #include #include #include #include #include #include #include "mqtt_support.h" #include "SoilMoisture.h" #include "trygvis/sensor.h" #include "trygvis/sensor/io.h" #include "apps.h" namespace trygvis { namespace apps { using namespace std; using namespace std::chrono; using namespace trygvis::apps; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace trygvis::mqtt_support; class MqttSampleOutputStream : public SampleConsumer { public: MqttSampleOutputStream(const o &client_id, bool clean_session, string host, unsigned int port, string topic_name, unsigned int keep_alive) : SampleConsumer(), client(host, port, keep_alive, client_id, clean_session), topic_name(topic_name) { client.connect(); } ~MqttSampleOutputStream() { client.disconnect(); }; void onSample(SampleRecord const &sample) override { if (sample.empty()) { return; } // make a string of the sample auto buf = make_shared(); KeyValueWriterSampleConsumer out(buf, sample.dict); out.onSample(sample); string s = buf->str(); cout << "sample: " << s; // Publish the message int message_id; const char *message = s.c_str(); client.publish(&message_id, topic_name, qos, retain, static_cast(s.length()), s.c_str()); } const string topic_name; mqtt_client client; const int qos = 2; const bool retain = true; }; class mqtt_publish : public app { public: mqtt_publish() : app("mqtt-publish") { } ~mqtt_publish() = default; KeyDictionary dict; sample_format_type format; string client_id, host, input_file, topic_name; unsigned int port, keep_alive; bool clean_session; void add_options(po::options_description_easy_init &options) override { auto format_opt = po::value(&format)->default_value(sample_format_type::KEY_VALUE); auto input_file_opt = po::value(&input_file)->default_value("-"); auto topic_opt = po::value(&topic_name)->required(); auto keep_alive_opt = po::value(&keep_alive)->default_value(60); options("client-id", po::value(&client_id), "Client id"); options("clean-session", po::value(&clean_session), "Instruct the broker to clean any existing session state"); options("host", po::value(&host)->default_value("localhost"), "MQTT broker host name"); options("port", po::value(&port)->default_value(1883), "MQTT broker port"); options("format", format_opt, "Formatting of message format"); options("input-file", input_file_opt, "Input file, '-' means stdin"); options("topic", topic_opt, "The topic to publish to"); options("keep-alive", keep_alive_opt, "How often the broker should send PING messages"); } int main(app_execution &execution) override { mqtt_lib mqtt_lib; auto desc = execution.desc; auto vm = execution.vm; try { istream *inputStream; if (input_file == "-") { inputStream = &cin; } else { inputStream = new ifstream(input_file); if (inputStream->fail()) { cerr << "Unable to open input file " << input_file << endl; return EXIT_FAILURE; } } o client_id_; if (!vm["client-id"].empty()) { client_id_ = client_id; } else { clean_session = true; } auto output = make_shared(client_id_, clean_session, host, port, topic_name, keep_alive); auto input = make_shared(output, dict); // while (!output->client.connected()) { // cout << "Waiting for connection" << endl; // output->client.wait(); // } char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); cout << "got data: " << inputStream->gcount() << endl; auto buf = boost::asio::buffer(data, (size_t) inputStream->gcount()); input->process(buf); } input->finish(); while (output->client.unacked_messages()) { cout << "finishing.. unacked messages: " << output->client.unacked_messages() << endl; output->client.wait(); } return EXIT_SUCCESS; } catch (std::runtime_error &ex) { cout << "std::runtime_error: " << ex.what() << endl; return EXIT_FAILURE; } catch (std::exception &ex) { cout << "std::exception: " << ex.what() << endl; cout << "typeid: " << typeid(ex).name() << endl; return EXIT_FAILURE; } } }; } } int main(int argc, const char *argv[]) { using namespace trygvis::apps; return real_main(new mqtt_publish(), argc, argv); }