diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 16:27:10 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 16:27:10 +0200 |
commit | 06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch) | |
tree | bc272faec6c585693341a814c65483e86c56255b /raw-mqtt-consumer.cpp | |
parent | b632036b153297f83b10f6d960ccfe0c1772f00e (diff) | |
download | mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2 mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip |
o Adding a MQTT consumer that stores the parsed record in Cassandra.
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r-- | raw-mqtt-consumer.cpp | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp new file mode 100644 index 0000000..421d18c --- /dev/null +++ b/raw-mqtt-consumer.cpp @@ -0,0 +1,239 @@ +#include "cassandra_support.h" +#include "mqtt_support.h" +#include <thread> +#include <boost/lexical_cast.hpp> +#include <boost/program_options.hpp> +#include <trygvis/sensor.h> +#include <trygvis/sensor/io.h> + +namespace raw_mqtt_consumer { + +using namespace std; +using namespace std::chrono; +using namespace trygvis::cassandra_support; +using namespace trygvis::mqtt_support; +using namespace trygvis::sensor; +using namespace trygvis::sensor::io; +using namespace boost; +namespace po = boost::program_options; + +static bool should_run; +static string mqtt_host; +static int mqtt_port; +static string mqtt_topic; +static string keyspace_name = "soil_moisture"; + +static unique_ptr<cassandra_session> current_cassandra_session; + +struct measurement { + KeyDictionary dict; +}; + +class raw_mqtt_client : private mosqpp::mosquittopp { +public: + typedef std::function<void(const struct mosquitto_message *)> callback_t; + + raw_mqtt_client(callback_t on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; + loop_start(); + connect_async(mqtt_host.c_str(), mqtt_port, 10); + } + + ~raw_mqtt_client() { + loop_stop(true); + disconnect(); + } + +private: + callback_t on_message_; + bool subscribed = false; + + void on_connect(int rc) override { + cout << "Connected to MQTT broker, rc=" << rc << endl; +// should_run = false; + int qos = 0; + if (!subscribed) { + subscribed = true; + cout << "Subscribing..." << endl; + subscribe(nullptr, mqtt_topic.c_str(), qos); + } + } + + void on_disconnect(int rc) override { + subscribed = false; + + cout << "Oops, disconnected, rc=" << rc << endl; + } + + void on_publish(int mid) override { + } + + void on_message(const struct mosquitto_message *message) override { + string payload((const char *) message->payload, (size_t) message->payloadlen); + on_message_(message); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + cout << "Subscribed" << endl; + } + + void on_unsubscribe(int mid) override { + cout << "Oops, unsubscribed" << endl; + } + + void on_log(int level, const char *str) override { + cout << "MQTT: " << level << ":" << str << endl; + } + + void on_error() override { + cout << "Oops, error" << endl; + } +}; + +cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) { + cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3); + + auto system_now = system_clock::now(); + + char day[100]; + std::time_t t = system_clock::to_time_t(system_now); + std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); + q.bind(0, day); + cout << "day=" << day << endl; + + auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now); + long timestamp = now_ms.time_since_epoch().count(); + q.bind(1, timestamp); + cout << "timestamp=" << timestamp << endl; + + auto buf = make_shared<stringstream>(); + auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE); + output->write(record); + + cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size()); + for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) { + cassandra_tuple tuple(2); + tuple.set(0, key->name); + o<string> value = record.at(key); + if (value) { + tuple.set(1, value.get()); + } + c.append(std::move(tuple)); + }); + + q.bind(2, c); + + return session->execute2(std::move(q)); +} + +void on_message(const struct mosquitto_message *message) { + string payload((const char *) message->payload, (size_t) message->payloadlen); + cout << "storing message: " << endl; + cout << payload << endl; + cout << "----------------------------------------" << endl; + + KeyDictionary dict; + auto sample_buffer = make_shared<VectorSampleOutputStream>(); + auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict); + + mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen); + + input->process(buffer); + input->finish(); + + cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; + + if (current_cassandra_session) { + std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { + cout << "Sample: " << sample.to_string() << endl; + + insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) { + if (f) + cout << "Success!" << endl; + else { + cout << "Error: " << f.error_message() << endl; + } + }); + + cout << "sample insert scheduled" << endl; + }); + } else { + cout << "Not connected to Cassandra" << endl; + } +} + +int main(int argc, const char **argv) { + mqtt_lib mqtt_lib; + cassandra_logging cassandra_logging; + + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); + all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); + all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); + all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required()); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique<cassandra_session>(); + + cout << "Connecting to Cassandra at " << cassandra_cluster << endl; + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + auto connect_future = session->connect(cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = cassandra_future::error_message(connect_future); + cerr << "Could not connect to Cassandra: " << s << endl; + return EXIT_FAILURE; + } + + execute_query(session->underlying(), "USE " + keyspace_name); + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + raw_mqtt_client mqtt_client(on_message); + + should_run = true; + + while (should_run) { + cout << "sleeping.." << endl; + std::this_thread::sleep_for(60s); + } + + current_cassandra_session.release(); + + return 0; +} + +} + +int main(int argc, const char **argv) { + return raw_mqtt_consumer::main(argc, argv); +} |