diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 22:52:48 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-02 22:52:48 +0200 |
commit | cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55 (patch) | |
tree | 32aab58a0df0054e4e5c3bfe47c0299bfacddf25 | |
parent | 06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (diff) | |
download | mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.gz mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.bz2 mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.xz mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.zip |
o Adding better --help handling.
SM: Using "timestamp_ms" instead of "timestamp" as the timestamp key.
SM: On reception, send a "cooked" message with the value of each device. To be improved.
-rw-r--r-- | raw-mqtt-consumer.cpp | 12 | ||||
-rw-r--r-- | sm-http-server.cpp | 6 | ||||
-rw-r--r-- | sm-mqtt-consumer.cpp | 104 |
3 files changed, 77 insertions, 45 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp index 421d18c..86815c6 100644 --- a/raw-mqtt-consumer.cpp +++ b/raw-mqtt-consumer.cpp @@ -25,10 +25,6 @@ static string keyspace_name = "soil_moisture"; static unique_ptr<cassandra_session> current_cassandra_session; -struct measurement { - KeyDictionary dict; -}; - class raw_mqtt_client : private mosqpp::mosquittopp { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; @@ -148,7 +144,7 @@ void on_message(const struct mosquitto_message *message) { std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; - insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) { + insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) { if (f) cout << "Success!" << endl; else { @@ -169,6 +165,7 @@ int main(int argc, const char **argv) { string cassandra_cluster; po::options_description all("Options"); + all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); @@ -182,12 +179,13 @@ int main(int argc, const char **argv) { auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { - cerr << all << "\n"; + cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { - cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + cerr << "Unrecognized option: " << unrecognized.at(0) << endl; + cerr << all << endl; return EXIT_FAILURE; } diff --git a/sm-http-server.cpp b/sm-http-server.cpp index 6b3169d..b9c3203 100644 --- a/sm-http-server.cpp +++ b/sm-http-server.cpp @@ -147,6 +147,7 @@ void on_logging_from_cassandra(const CassLogMessage *message, void *data) { int main(int argc, const char *const argv[]) { string cassandra_cluster; po::options_description all("Options"); + all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); po::variables_map vm; @@ -157,12 +158,13 @@ int main(int argc, const char *const argv[]) { auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { - cerr << all << "\n"; + cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { - cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + cerr << "Unrecognized option: " << unrecognized.at(0) << endl; + cerr << all << endl; return EXIT_FAILURE; } diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index b91b663..e905d02 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -2,7 +2,6 @@ #include "mqtt_support.h" #include "misc_support.h" #include <thread> -#include <boost/program_options.hpp> #include <trygvis/sensor/io.h> namespace sm_mqtt_consumer { @@ -14,16 +13,20 @@ using namespace trygvis::sensor::io; using namespace trygvis::cassandra_support; using namespace trygvis::mqtt_support; using namespace trygvis::misc_support; -using namespace boost; -namespace po = boost::program_options; static bool should_run; -static string mqtt_broker_host; -static auto mqtt_broker_port = 1883; -static auto queue_name = "/trygvis"; +static string mqtt_host; +static int mqtt_port; +static string raw_subscription; +static string sm_topic; +static int sm_qos; +static bool sm_retain; + static string keyspace_name = "soil_moisture"; +class sm_mqtt_client; static unique_ptr<cassandra_session> current_cassandra_session; +static unique_ptr<sm_mqtt_client> mqtt_client; struct sensor_measurement { int sensor; @@ -57,22 +60,36 @@ struct device_measurement { } }; -class mqtt_client : private mosqpp::mosquittopp { +class sm_mqtt_client : private mosqpp::mosquittopp { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; - mqtt_client(callback_t on_message_) : mosquittopp(), - on_message_(on_message_) { - cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl; + sm_mqtt_client(callback_t on_message_) : mosquittopp(), + on_message_(on_message_) { + cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; loop_start(); - connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10); + connect_async(mqtt_host.c_str(), mqtt_port, 10); } - ~mqtt_client() { + ~sm_mqtt_client() { loop_stop(true); disconnect(); } + void send_sm_messages(string device, const device_measurement &measurement) { + string device_topic = sm_topic + "/" + device; + + for_each(measurement.sensors.begin(), measurement.sensors.end(), [&](auto sensor) { + auto sensor_topic = device_topic + "/" + to_string(sensor.sensor); + + // TODO: iterate over all values + + auto payload = std::to_string(sensor.value); + auto topic = sensor_topic + "/value"; + this->publish(nullptr, topic.c_str(), (int) payload.length(), payload.c_str(), sm_qos, sm_retain); + }); + } + private: callback_t on_message_; bool subscribed = false; @@ -83,8 +100,8 @@ private: int qos = 0; if (!subscribed) { subscribed = true; - cout << "Subscribing..." << endl; - subscribe(nullptr, queue_name, qos); + cout << "Subscribing to " << raw_subscription << endl; + subscribe(nullptr, raw_subscription.c_str(), qos); } } @@ -119,17 +136,17 @@ private: } }; -auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { - cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); +cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_measurement &&measurement) { + cassandra_statement stmt("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); - q.bind(0, measurement.device); + stmt.bind(0, measurement.device); std::time_t t = measurement.timestamp; char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); - q.bind(1, day); + stmt.bind(1, day); - q.bind(2, measurement.timestamp * 1000); + stmt.bind(2, measurement.timestamp * 1000); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { @@ -139,14 +156,14 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen sensors.append(std::move(tuple)); }); - q.bind(3, sensors); + stmt.bind(3, sensors); - return cass_session_execute(session, q); + return session->execute2(std::move(stmt)); } void on_message(const struct mosquitto_message *message) { string payload((const char *) message->payload, (size_t) message->payloadlen); - cout << "storing message: " << endl; + cout << "Got message on " << message->topic << ":" << endl; cout << payload << endl; cout << "----------------------------------------" << endl; @@ -162,7 +179,7 @@ void on_message(const struct mosquitto_message *message) { cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl; auto device_key = dict.indexOf("device"); - auto timestamp_key = dict.indexOf("timestamp"); + auto timestamp_key = dict.indexOf("timestamp_ms"); std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; @@ -172,10 +189,12 @@ void on_message(const struct mosquitto_message *message) { if (!deviceO) { cout << "Missing required key 'device'" << endl; + return; } if (!timestampS) { cout << "Missing required key 'timestamp'" << endl; + return; } auto device = deviceO.get(); @@ -183,6 +202,7 @@ void on_message(const struct mosquitto_message *message) { auto timestamp = flat_map(timestampS, l_c<long>); if (!timestamp) { cout << "Invalid value for 'timestamp'" << endl; + return; } vector<sensor_measurement> sensors; @@ -208,24 +228,33 @@ void on_message(const struct mosquitto_message *message) { cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { - handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) { - cout << "Success!" << endl; - }, [&](auto future, auto err) { - cout << "Failure: " << cassandra_future::error_message(future) << endl; + insert_into_sm_by_day(current_cassandra_session.get(), std::move(measurement))->then([&](auto &f) { + if (f) { + cout << "INSERT INTO sm_by_day OK" << endl; + } else { + cout << "INSERT INTO sm_by_day failed: " << f.error_message() << endl; + } }); } else { cout << "Not connected to Cassandra" << endl; } + + mqtt_client->send_sm_messages(device, measurement); }); } -mqtt_lib mqtt_lib; - int main(int argc, const char **argv) { + mqtt_lib mqtt_lib; + cassandra_logging cassandra_logging; + string cassandra_cluster; po::options_description all("Options"); + all.add_options()("help", "Show command options"); all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); - all.add_options()("mqtt-broker-host", po::value<string>(&mqtt_broker_host)->default_value("trygvis.io")); + all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io")); + all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883)); + all.add_options()("raw-subscription", po::value<>(&raw_subscription)->default_value("/soil-moisture/raw/#")); + all.add_options()("sm-topic", po::value<>(&sm_topic)->default_value("/soil-moisture/device")); po::variables_map vm; try { @@ -235,12 +264,13 @@ int main(int argc, const char **argv) { auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); if (vm.count("help")) { - cerr << all << "\n"; + cerr << all << endl; return EXIT_FAILURE; } if (unrecognized.size()) { - cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + cerr << "Unrecognized option: " << unrecognized.at(0) << endl; + cerr << all << endl; return EXIT_FAILURE; } @@ -252,14 +282,16 @@ int main(int argc, const char **argv) { return EXIT_FAILURE; } - mqtt_client mqtt_client(on_message); + mqtt_client = make_unique<sm_mqtt_client>(on_message); + CassFuture *connect_future = nullptr; CassCluster *cluster = cass_cluster_new(); auto session = make_unique<cassandra_session>(); + cout << "Connecting to cassandra cluster: " << cassandra_cluster << endl; cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - connect_future = cass_session_connect((CassSession*) session.get(), cluster); + connect_future = session->connect(cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = cassandra_future::error_message(connect_future); @@ -267,11 +299,11 @@ int main(int argc, const char **argv) { return EXIT_FAILURE; } + execute_query(session->underlying(), "USE " + keyspace_name); + cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); - execute_query((CassSession*) current_cassandra_session.get(), "USE " + keyspace_name); - should_run = true; while (should_run) { cout << "sleeping.." << endl; |