aboutsummaryrefslogtreecommitdiff
path: root/sm-mqtt-consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sm-mqtt-consumer.cpp')
-rw-r--r--sm-mqtt-consumer.cpp328
1 files changed, 328 insertions, 0 deletions
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
new file mode 100644
index 0000000..d7dbfdc
--- /dev/null
+++ b/sm-mqtt-consumer.cpp
@@ -0,0 +1,328 @@
+#include "cassandra_support.h"
+#include "mosquittopp.h"
+#include "trygvis/sensor/io.h"
+#include <thread>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options.hpp>
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace trygvis::cassandra_support;
+using namespace boost;
+namespace po = boost::program_options;
+
+static bool should_run;
+static string mqtt_broker_host;
+static auto mqtt_broker_port = 1883;
+static auto queue_name = "/trygvis";
+static string keyspace_name = "soil_moisture";
+
+static unique_ptr<cassandra_session> current_cassandra_session;
+
+struct sensor_measurement {
+ int sensor;
+ int value;
+
+ sensor_measurement(int sensor, int value) : sensor(sensor), value(value) {
+ }
+
+ ~sensor_measurement() = default;
+};
+
+struct device_measurement {
+ string device;
+ long timestamp;
+ vector<sensor_measurement> sensors;
+
+ device_measurement(string &device, long timestamp, vector<sensor_measurement> &&sensors) :
+ device(device), timestamp(timestamp), sensors(std::move(sensors)) {
+ };
+
+ ~device_measurement() = default;
+
+ string str() {
+ stringstream buf;
+ buf << "device=" << device;
+ buf << ", timestamp=" << timestamp;
+ std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) {
+ buf << ", #" << sensor.sensor << "=" + sensor.value;
+ });
+ return buf.str();
+ }
+};
+
+string &&to_string(CassFuture *f) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(f, &message, &message_length);
+
+ return std::move(string(message, message_length));
+}
+
+class mqtt_lib {
+public:
+ mqtt_lib() {
+ mosquitto_lib_init();
+ }
+
+ ~mqtt_lib() {
+ mosquitto_lib_cleanup();
+ }
+};
+
+class mqtt_client : private mosqpp::mosquittopp {
+public:
+ mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(),
+ on_message_(on_message_) {
+ cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl;
+ loop_start();
+ connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10);
+ }
+
+ ~mqtt_client() {
+ loop_stop(true);
+ disconnect();
+ }
+
+private:
+ std::function<void(const struct mosquitto_message *)> on_message_;
+ bool subscribed = false;
+
+ void on_connect(int rc) override {
+ cout << "Connected to MQTT broker, rc=" << rc << endl;
+// should_run = false;
+ int qos = 0;
+ if (!subscribed) {
+ subscribed = true;
+ cout << "Subscribing..." << endl;
+ subscribe(nullptr, queue_name, qos);
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ subscribed = false;
+
+ cout << "Oops, disconnected, rc=" << rc << endl;
+ }
+
+ void on_publish(int mid) override {
+ }
+
+ void on_message(const struct mosquitto_message *message) override {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ on_message_(message);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ cout << "Subscribed" << endl;
+ }
+
+ void on_unsubscribe(int mid) override {
+ cout << "Oops, unsubscribed" << endl;
+ }
+
+ void on_log(int level, const char *str) override {
+ cout << "MQTT: " << level << ":" << str << endl;
+ }
+
+ void on_error() override {
+ cout << "Oops, error" << endl;
+ }
+};
+
+void print_error(CassFuture *future) {
+ cout << "Cassandra error: " << error_message(future) << endl;
+}
+
+auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
+ cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
+
+ q.bind(0, measurement.device);
+
+ std::time_t t = measurement.timestamp;
+ char day[100];
+ std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
+ q.bind(1, day);
+
+ q.bind(2, measurement.timestamp * 1000);
+
+ cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
+ for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
+ cassandra_tuple tuple(2);
+ tuple.set(0, sensor.sensor);
+ tuple.set(1, sensor.value);
+ sensors.append(std::move(tuple));
+ });
+
+ q.bind(3, sensors);
+
+ return cass_session_execute(session, q.statement);
+}
+
+template<typename Target, typename Source>
+boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) {
+ if (!a.is_initialized()) {
+ return boost::none;
+ }
+
+ return make_optional(f(a));
+}
+
+template<typename Target, typename Source>
+boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) {
+ if (!a.is_initialized()) {
+ return boost::none;
+ }
+
+ return f(a.get());
+}
+
+template<typename Target, typename Source = string>
+boost::optional<Target> l_c(const Source source) {
+ try {
+ return boost::lexical_cast<Target>(source);
+ } catch (bad_lexical_cast &e) {
+ return boost::none;
+ }
+};
+
+void on_message(const struct mosquitto_message *message) {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ cout << "storing message: " << endl;
+ cout << payload << endl;
+ cout << "----------------------------------------" << endl;
+
+ KeyDictionary dict;
+ auto sample_buffer = make_shared<VectorSampleOutputStream>();
+ auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict);
+
+ mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen);
+
+ input->process(buffer);
+ input->finish();
+
+ cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl;
+
+ auto device_key = dict.indexOf("device");
+ auto timestamp_key = dict.indexOf("timestamp");
+
+ std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
+ cout << "Sample: " << sample.to_string() << endl;
+
+ auto deviceO = sample.at(device_key);
+ auto timestampS = sample.at(timestamp_key);
+
+ if (!deviceO) {
+ cout << "Missing required key 'device'" << endl;
+ }
+
+ if (!timestampS) {
+ cout << "Missing required key 'timestamp'" << endl;
+ }
+
+ auto device = deviceO.get();
+
+ auto timestamp = flat_map(timestampS, l_c<long>);
+ if (!timestamp) {
+ cout << "Invalid value for 'timestamp'" << endl;
+ }
+
+ vector<sensor_measurement> sensors;
+
+ for (int i = 0; i < 10; i++) {
+ auto valueS = sample.at(dict.indexOf("sensor" + to_string(i)));
+
+ auto value = flat_map(valueS, l_c<int>);
+
+ if (!value) {
+ continue;
+ }
+
+ sensors.emplace_back(i, value.get());
+ }
+
+ if (sensors.size() == 0) {
+ return;
+ }
+
+ device_measurement measurement(device, timestamp.get(), std::move(sensors));
+
+ cout << "Measurement: " << measurement.str() << endl;
+
+ if (current_cassandra_session) {
+ handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) {
+ cout << "Success!" << endl;
+ }, [&](auto future, auto err) {
+ cout << "Failure: " << error_message(future) << endl;
+ });
+ } else {
+ cout << "Not connected to Cassandra" << endl;
+ }
+ });
+}
+
+int main(int argc, const char **argv) {
+ mqtt_lib mqtt_lib();
+
+ string cassandra_cluster;
+ po::options_description all("Options");
+ all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
+ all.add_options()("mqtt-broker-host", po::value<string>(&mqtt_broker_host)->default_value("trygvis.io"));
+
+ po::variables_map vm;
+ try {
+ auto parsed = po::parse_command_line(argc, argv, all);
+ po::store(parsed, vm);
+ po::notify(vm);
+ auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
+
+ if (vm.count("help")) {
+ cerr << all << "\n";
+ return EXIT_FAILURE;
+ }
+
+ if (unrecognized.size()) {
+ cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ return EXIT_FAILURE;
+ }
+
+ } catch (po::required_option &e) {
+ cerr << "Missing required option: " << e.get_option_name() << endl;
+ cerr << all << endl;
+ } catch (po::unknown_option &e) {
+ cerr << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+
+ mqtt_client mqtt_client(on_message);
+ CassFuture *connect_future = nullptr;
+ CassCluster *cluster = cass_cluster_new();
+ auto session = make_unique<cassandra_session>();
+
+ cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+
+ connect_future = cass_session_connect(session->session, cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = to_string(connect_future);
+ cerr << "Could not connect to Cassandra:" << s << endl;
+ return EXIT_FAILURE;
+ }
+
+ cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
+
+ execute_query(current_cassandra_session->session, "USE " + keyspace_name);
+
+ should_run = true;
+ while (should_run) {
+ cout << "sleeping.." << endl;
+ std::this_thread::sleep_for(60s);
+ }
+
+ current_cassandra_session.release();
+
+ return 0;
+}