#include "cassandra_support.h" #include "mqtt_support.h" #include "misc_support.h" #include #include namespace sm_mqtt_consumer { using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace trygvis::cassandra_support; using namespace trygvis::mqtt_support; using namespace trygvis::misc_support; static bool should_run; static string mqtt_host; static int mqtt_port; static string raw_subscription; static string sm_topic; static int sm_qos; static bool sm_retain; static string keyspace_name = "soil_moisture"; class sm_mqtt_client; static unique_ptr current_cassandra_session; static unique_ptr mqtt_client; struct sensor_measurement { int sensor; int value; sensor_measurement(int sensor, int value) : sensor(sensor), value(value) { } ~sensor_measurement() = default; }; struct device_measurement { string device; timestamp_t timestamp; vector sensors; device_measurement(string &device, timestamp_t timestamp, vector &&sensors) : device(device), timestamp(timestamp), sensors(std::move(sensors)) { }; ~device_measurement() = default; string str() { stringstream buf; buf << "device=" << device; buf << ", timestamp=" << timestamp.time_since_epoch().count(); std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { buf << ", #" << sensor.sensor << "=" << sensor.value; }); return buf.str(); } }; class sm_mqtt_client : private mosqpp::mosquittopp { public: typedef std::function callback_t; sm_mqtt_client(callback_t on_message_) : mosquittopp(), on_message_(on_message_) { cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; loop_start(); connect_async(mqtt_host.c_str(), mqtt_port, 10); } ~sm_mqtt_client() { loop_stop(true); disconnect(); } void send_sm_messages(string device, const device_measurement &measurement) { string device_topic = sm_topic + "/" + device; for_each(measurement.sensors.begin(), measurement.sensors.end(), [&](auto sensor) { auto sensor_topic = device_topic + "/" + to_string(sensor.sensor); // TODO: iterate over all values auto payload = std::to_string(sensor.value); auto topic = sensor_topic + "/value"; this->publish(nullptr, topic.c_str(), (int) payload.length(), payload.c_str(), sm_qos, sm_retain); }); } private: callback_t on_message_; bool subscribed = false; void on_connect(int rc) override { cout << "Connected to MQTT broker, rc=" << rc << endl; // should_run = false; int qos = 0; if (!subscribed) { subscribed = true; cout << "Subscribing to " << raw_subscription << endl; subscribe(nullptr, raw_subscription.c_str(), qos); } } void on_disconnect(int rc) override { subscribed = false; cout << "Oops, disconnected, rc=" << rc << endl; } void on_publish(int mid) override { } void on_message(const struct mosquitto_message *message) override { string payload((const char *) message->payload, (size_t) message->payloadlen); on_message_(message); } void on_subscribe(int mid, int qos_count, const int *granted_qos) override { cout << "Subscribed" << endl; } void on_unsubscribe(int mid) override { cout << "Oops, unsubscribed" << endl; } void on_log(int level, const char *str) override { cout << "MQTT: " << level << ":" << str << endl; } void on_error() override { cout << "Oops, error" << endl; } }; cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_measurement &&measurement) { cassandra_statement stmt("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); stmt.bind(0, measurement.device); std::time_t t = system_clock::to_time_t(measurement.timestamp); char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); stmt.bind(1, day); stmt.bind(2, measurement.timestamp.time_since_epoch().count()); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { cassandra_tuple tuple(2); tuple.set(0, sensor.sensor); tuple.set(1, sensor.value); sensors.append(std::move(tuple)); }); stmt.bind(3, sensors); return session->execute2(std::move(stmt)); } void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); cout << "Got message on " << message->topic << ":" << endl; cout << payload << endl; cout << "----------------------------------------" << endl; KeyDictionary dict; auto sample_buffer = make_shared(); auto input = make_shared(sample_buffer, dict); mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); input->process(buffer); input->finish(); cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; auto device_key = dict.indexOf("device"); auto timestamp_key = dict.indexOf("timestamp_ms"); std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; auto deviceO = sample.at(device_key); auto timestampS = sample.at(timestamp_key); if (!deviceO) { cout << "Missing required key 'device'" << endl; return; } if (!timestampS) { cout << "Missing required key 'timestamp'" << endl; return; } auto device = deviceO.get(); auto timestamp = flat_map(timestampS, l_c); if (!timestamp) { cout << "Invalid value for 'timestamp'" << endl; return; } vector sensors; for (int i = 0; i < 10; i++) { auto valueS = sample.at(dict.indexOf("sensor" + to_string(i))); auto value = flat_map(valueS, l_c); if (!value) { continue; } sensors.emplace_back(i, value.get()); } if (sensors.size() == 0) { return; } timestamp_t ts(milliseconds(timestamp.get())); device_measurement measurement(device, ts, std::move(sensors)); cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { insert_into_sm_by_day(current_cassandra_session.get(), std::move(measurement))->then([&](auto &f) { if (f) { cout << "INSERT INTO sm_by_day OK" << endl; } else { cout << "INSERT INTO sm_by_day failed: " << f.error_message() << endl; } }); } else { cout << "Not connected to Cassandra" << endl; } mqtt_client->send_sm_messages(device, measurement); }); } int main(int argc, const char **argv) { mqtt_lib mqtt_lib; cassandra_logging cassandra_logging; string cassandra_cluster; po::options_description all("Options"); all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); all.add_options()("raw-subscription", po::value<>(&raw_subscription)->default_value("/soil-moisture/raw/#")); all.add_options()("sm-topic", po::value<>(&sm_topic)->default_value("/soil-moisture/device")); po::variables_map vm; try { auto parsed = po::parse_command_line(argc, argv, all); po::store(parsed, vm); po::notify(vm); auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { cerr << "Unrecognized option: " << unrecognized.at(0) << endl; cerr << all << endl; return EXIT_FAILURE; } } catch (po::required_option &e) { cerr << "Missing required option: " << e.get_option_name() << endl; cerr << all << endl; return EXIT_FAILURE; } catch (po::unknown_option &e) { cerr << e.what() << endl; return EXIT_FAILURE; } mqtt_client = make_unique(on_message); CassFuture *connect_future = nullptr; CassCluster *cluster = cass_cluster_new(); auto session = make_unique(); cout << "Connecting to cassandra cluster: " << cassandra_cluster << endl; cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); connect_future = session->connect(cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra:" << s << endl; return EXIT_FAILURE; } execute_query(session->underlying(), "USE " + keyspace_name); cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); should_run = true; while (should_run) { cout << "sleeping.." << endl; std::this_thread::sleep_for(60s); } current_cassandra_session.release(); return 0; } } int main(int argc, const char **argv) { return sm_mqtt_consumer::main(argc, argv); }