#include "cassandra_support.h" #include "mqtt_support.h" #include "misc_support.h" #include #include #include namespace raw_mqtt_consumer { using namespace trygvis::cassandra_support; using namespace trygvis::mqtt_support; using namespace trygvis::misc_support; using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace boost; namespace po = boost::program_options; static bool should_run; static string mqtt_host; static int mqtt_port; static string mqtt_topic; static string keyspace_name = "soil_moisture"; static unique_ptr current_cassandra_session; class raw_mqtt_client /*: private mosqpp::mosquittopp*/ { public: typedef std::function callback_t; raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) { cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; mosquitto = mosquitto_new(nullptr, true, this); if(!mosquitto) { throw runtime_error("Could not initialize mosquitto instance"); } mosquitto_connect_callback_set(mosquitto, on_connect_cb); mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); mosquitto_publish_callback_set(mosquitto, on_publish_cb); mosquitto_message_callback_set(mosquitto, on_message_cb); mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); mosquitto_log_callback_set(mosquitto, on_log_cb); mosquitto_loop_start(mosquitto); mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10); } ~raw_mqtt_client() { mosquitto_loop_stop(mosquitto, true); mosquitto_disconnect(mosquitto); } private: callback_t on_message_; bool subscribed = false; struct mosquitto *mosquitto; static void on_connect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_connect(rc); } void on_connect(int rc) { cout << "Connected to MQTT broker, rc=" << rc << endl; // should_run = false; int qos = 0; if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos); } } static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_disconnect(rc); } void on_disconnect(int rc) { subscribed = false; cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl; } static void on_publish_cb(struct mosquitto *m, void *self, int rc) { static_cast(self)->on_publish(rc); } void on_publish(int mid) { } static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { static_cast(self)->on_message(message); } void on_message(const struct mosquitto_message *message) { on_message_(message); } static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { static_cast(self)->on_subscribe(mid, qos_count, granted_qos); } void on_subscribe(int mid, int qos_count, const int *granted_qos) { cout << "Subscribed" << endl; } static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { static_cast(self)->on_unsubscribe(mid); } void on_unsubscribe(int mid) { cout << "Oops, unsubscribed" << endl; } static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { static_cast(self)->on_log(level, str); } void on_log(int level, const char *str) { cout << "MQTT: " << level << ":" << str << endl; } }; cassandra_future2 *insert_into_raw(unique_ptr &session, const struct mosquitto_message *message, const SampleRecord &record) { cassandra_statement q("INSERT INTO raw_record(day, timestamp, mid, topic, qos, records) VALUES (?, ?, ?, ?, ?, ?);", 6); auto system_now = system_clock::now(); char day[100]; std::time_t t = system_clock::to_time_t(system_now); std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); q.bind_string(0, day); cout << "day=" << day << endl; auto now_ms = std::chrono::time_point_cast(system_now); auto timestamp = now_ms.time_since_epoch().count(); q.bind_int64(1, timestamp); cout << "timestamp=" << timestamp << endl; q.bind_int64(2, message->mid); q.bind_string(3, message->topic); q.bind_int32(4, (cass_int32_t)message->qos); auto buf = make_shared(); auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE); output->write(record); cassandra_collection records(CASS_COLLECTION_TYPE_LIST, record.dict.size()); for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) { cassandra_tuple tuple(2); tuple.set(0, key->name); o value = record.at(key); if (value) { tuple.set(1, value.get()); } records.append(std::move(tuple)); }); q.bind_collection(5, records); return session->execute2(std::move(q)); } void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); cout << "storing message: " << endl; cout << payload << endl; cout << "----------------------------------------" << endl; KeyDictionary dict; auto sample_buffer = make_shared(); auto input = make_shared(sample_buffer, dict); mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); input->process(buffer); input->finish(); cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; if (current_cassandra_session) { std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; insert_into_raw(current_cassandra_session, message, sample)->then([&](auto &f) { if (f) cout << "Success!" << endl; else { cout << "Error: " << f.error_message() << endl; } }); cout << "sample insert scheduled" << endl; }); } else { cout << "Not connected to Cassandra" << endl; } } int main(int argc, const char **argv) { mqtt_lib mqtt_lib; cassandra_logging cassandra_logging; string cassandra_cluster; po::options_description all("Options"); all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value(&cassandra_cluster)->default_value("127.0.0.1")); all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required()); po::variables_map vm; try { auto parsed = po::parse_command_line(argc, argv, all); po::store(parsed, vm); po::notify(vm); auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { cerr << "Unrecognized option: " << unrecognized.at(0) << endl; cerr << all << endl; return EXIT_FAILURE; } } catch (po::required_option &e) { cerr << "Missing required option: " << e.get_option_name() << endl; cerr << all << endl; return EXIT_FAILURE; } catch (po::unknown_option &e) { cerr << e.what() << endl; return EXIT_FAILURE; } CassCluster *cluster = cass_cluster_new(); auto session = make_unique(); cout << "Connecting to Cassandra at " << cassandra_cluster << endl; cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); auto connect_future = session->connect(cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = cassandra_future::error_message(connect_future); cerr << "Could not connect to Cassandra: " << s << endl; return EXIT_FAILURE; } execute_query(session->underlying(), "USE " + keyspace_name); cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); cout << "Connecting to MQTT broker" << endl; raw_mqtt_client mqtt_client(on_message); should_run = true; while (should_run) { cout << "sleeping.." << endl; std::this_thread::sleep_for(60s); } current_cassandra_session.release(); return 0; } } int main(int argc, const char **argv) { return raw_mqtt_consumer::main(argc, argv); }