aboutsummaryrefslogtreecommitdiff
path: root/raw-mqtt-consumer.cpp
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
commit06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch)
treebc272faec6c585693341a814c65483e86c56255b /raw-mqtt-consumer.cpp
parentb632036b153297f83b10f6d960ccfe0c1772f00e (diff)
downloadmqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip
o Adding a MQTT consumer that stores the parsed record in Cassandra.
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r--raw-mqtt-consumer.cpp239
1 files changed, 239 insertions, 0 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
new file mode 100644
index 0000000..421d18c
--- /dev/null
+++ b/raw-mqtt-consumer.cpp
@@ -0,0 +1,239 @@
+#include "cassandra_support.h"
+#include "mqtt_support.h"
+#include <thread>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options.hpp>
+#include <trygvis/sensor.h>
+#include <trygvis/sensor/io.h>
+
+namespace raw_mqtt_consumer {
+
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::cassandra_support;
+using namespace trygvis::mqtt_support;
+using namespace trygvis::sensor;
+using namespace trygvis::sensor::io;
+using namespace boost;
+namespace po = boost::program_options;
+
+static bool should_run;
+static string mqtt_host;
+static int mqtt_port;
+static string mqtt_topic;
+static string keyspace_name = "soil_moisture";
+
+static unique_ptr<cassandra_session> current_cassandra_session;
+
+struct measurement {
+ KeyDictionary dict;
+};
+
+class raw_mqtt_client : private mosqpp::mosquittopp {
+public:
+ typedef std::function<void(const struct mosquitto_message *)> callback_t;
+
+ raw_mqtt_client(callback_t on_message_) : mosquittopp(),
+ on_message_(on_message_) {
+ cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
+ loop_start();
+ connect_async(mqtt_host.c_str(), mqtt_port, 10);
+ }
+
+ ~raw_mqtt_client() {
+ loop_stop(true);
+ disconnect();
+ }
+
+private:
+ callback_t on_message_;
+ bool subscribed = false;
+
+ void on_connect(int rc) override {
+ cout << "Connected to MQTT broker, rc=" << rc << endl;
+// should_run = false;
+ int qos = 0;
+ if (!subscribed) {
+ subscribed = true;
+ cout << "Subscribing..." << endl;
+ subscribe(nullptr, mqtt_topic.c_str(), qos);
+ }
+ }
+
+ void on_disconnect(int rc) override {
+ subscribed = false;
+
+ cout << "Oops, disconnected, rc=" << rc << endl;
+ }
+
+ void on_publish(int mid) override {
+ }
+
+ void on_message(const struct mosquitto_message *message) override {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ on_message_(message);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ cout << "Subscribed" << endl;
+ }
+
+ void on_unsubscribe(int mid) override {
+ cout << "Oops, unsubscribed" << endl;
+ }
+
+ void on_log(int level, const char *str) override {
+ cout << "MQTT: " << level << ":" << str << endl;
+ }
+
+ void on_error() override {
+ cout << "Oops, error" << endl;
+ }
+};
+
+cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) {
+ cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3);
+
+ auto system_now = system_clock::now();
+
+ char day[100];
+ std::time_t t = system_clock::to_time_t(system_now);
+ std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
+ q.bind(0, day);
+ cout << "day=" << day << endl;
+
+ auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now);
+ long timestamp = now_ms.time_since_epoch().count();
+ q.bind(1, timestamp);
+ cout << "timestamp=" << timestamp << endl;
+
+ auto buf = make_shared<stringstream>();
+ auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE);
+ output->write(record);
+
+ cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size());
+ for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) {
+ cassandra_tuple tuple(2);
+ tuple.set(0, key->name);
+ o<string> value = record.at(key);
+ if (value) {
+ tuple.set(1, value.get());
+ }
+ c.append(std::move(tuple));
+ });
+
+ q.bind(2, c);
+
+ return session->execute2(std::move(q));
+}
+
+void on_message(const struct mosquitto_message *message) {
+ string payload((const char *) message->payload, (size_t) message->payloadlen);
+ cout << "storing message: " << endl;
+ cout << payload << endl;
+ cout << "----------------------------------------" << endl;
+
+ KeyDictionary dict;
+ auto sample_buffer = make_shared<VectorSampleOutputStream>();
+ auto input = make_shared<KeyValueSampleStreamParser>(sample_buffer, dict);
+
+ mutable_buffers_1 buffer = boost::asio::buffer(message->payload, (std::size_t) message->payloadlen);
+
+ input->process(buffer);
+ input->finish();
+
+ cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl;
+
+ if (current_cassandra_session) {
+ std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
+ cout << "Sample: " << sample.to_string() << endl;
+
+ insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) {
+ if (f)
+ cout << "Success!" << endl;
+ else {
+ cout << "Error: " << f.error_message() << endl;
+ }
+ });
+
+ cout << "sample insert scheduled" << endl;
+ });
+ } else {
+ cout << "Not connected to Cassandra" << endl;
+ }
+}
+
+int main(int argc, const char **argv) {
+ mqtt_lib mqtt_lib;
+ cassandra_logging cassandra_logging;
+
+ string cassandra_cluster;
+ po::options_description all("Options");
+ all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
+ all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
+ all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
+ all.add_options()("mqtt-topic", po::value<>(&mqtt_topic)->required());
+
+ po::variables_map vm;
+ try {
+ auto parsed = po::parse_command_line(argc, argv, all);
+ po::store(parsed, vm);
+ po::notify(vm);
+ auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
+
+ if (vm.count("help")) {
+ cerr << all << "\n";
+ return EXIT_FAILURE;
+ }
+
+ if (unrecognized.size()) {
+ cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ return EXIT_FAILURE;
+ }
+
+ } catch (po::required_option &e) {
+ cerr << "Missing required option: " << e.get_option_name() << endl;
+ cerr << all << endl;
+ } catch (po::unknown_option &e) {
+ cerr << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+
+ CassCluster *cluster = cass_cluster_new();
+ auto session = make_unique<cassandra_session>();
+
+ cout << "Connecting to Cassandra at " << cassandra_cluster << endl;
+ cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+
+ auto connect_future = session->connect(cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = cassandra_future::error_message(connect_future);
+ cerr << "Could not connect to Cassandra: " << s << endl;
+ return EXIT_FAILURE;
+ }
+
+ execute_query(session->underlying(), "USE " + keyspace_name);
+
+ cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
+
+ raw_mqtt_client mqtt_client(on_message);
+
+ should_run = true;
+
+ while (should_run) {
+ cout << "sleeping.." << endl;
+ std::this_thread::sleep_for(60s);
+ }
+
+ current_cassandra_session.release();
+
+ return 0;
+}
+
+}
+
+int main(int argc, const char **argv) {
+ return raw_mqtt_consumer::main(argc, argv);
+}