aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-02 22:52:48 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-02 22:52:48 +0200
commitcf94e623ff3c3dcdd4ca6a59632d48bd17b75d55 (patch)
tree32aab58a0df0054e4e5c3bfe47c0299bfacddf25
parent06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (diff)
downloadmqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.gz
mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.bz2
mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.tar.xz
mqtt-cassandra-bridge-cf94e623ff3c3dcdd4ca6a59632d48bd17b75d55.zip
o Adding better --help handling.
SM: Using "timestamp_ms" instead of "timestamp" as the timestamp key. SM: On reception, send a "cooked" message with the value of each device. To be improved.
-rw-r--r--raw-mqtt-consumer.cpp12
-rw-r--r--sm-http-server.cpp6
-rw-r--r--sm-mqtt-consumer.cpp104
3 files changed, 77 insertions, 45 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
index 421d18c..86815c6 100644
--- a/raw-mqtt-consumer.cpp
+++ b/raw-mqtt-consumer.cpp
@@ -25,10 +25,6 @@ static string keyspace_name = "soil_moisture";
static unique_ptr<cassandra_session> current_cassandra_session;
-struct measurement {
- KeyDictionary dict;
-};
-
class raw_mqtt_client : private mosqpp::mosquittopp {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
@@ -148,7 +144,7 @@ void on_message(const struct mosquitto_message *message) {
std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
cout << "Sample: " << sample.to_string() << endl;
- insert_into_raw(current_cassandra_session, sample)->then([&](cassandra_future2 &f) {
+ insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) {
if (f)
cout << "Success!" << endl;
else {
@@ -169,6 +165,7 @@ int main(int argc, const char **argv) {
string cassandra_cluster;
po::options_description all("Options");
+ all.add_options()("help", "Show command options");
all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
@@ -182,12 +179,13 @@ int main(int argc, const char **argv) {
auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
if (vm.count("help")) {
- cerr << all << "\n";
+ cerr << all << endl;
return EXIT_FAILURE;
}
if (unrecognized.size()) {
- cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ cerr << "Unrecognized option: " << unrecognized.at(0) << endl;
+ cerr << all << endl;
return EXIT_FAILURE;
}
diff --git a/sm-http-server.cpp b/sm-http-server.cpp
index 6b3169d..b9c3203 100644
--- a/sm-http-server.cpp
+++ b/sm-http-server.cpp
@@ -147,6 +147,7 @@ void on_logging_from_cassandra(const CassLogMessage *message, void *data) {
int main(int argc, const char *const argv[]) {
string cassandra_cluster;
po::options_description all("Options");
+ all.add_options()("help", "Show command options");
all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
po::variables_map vm;
@@ -157,12 +158,13 @@ int main(int argc, const char *const argv[]) {
auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
if (vm.count("help")) {
- cerr << all << "\n";
+ cerr << all << endl;
return EXIT_FAILURE;
}
if (unrecognized.size()) {
- cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ cerr << "Unrecognized option: " << unrecognized.at(0) << endl;
+ cerr << all << endl;
return EXIT_FAILURE;
}
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index b91b663..e905d02 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -2,7 +2,6 @@
#include "mqtt_support.h"
#include "misc_support.h"
#include <thread>
-#include <boost/program_options.hpp>
#include <trygvis/sensor/io.h>
namespace sm_mqtt_consumer {
@@ -14,16 +13,20 @@ using namespace trygvis::sensor::io;
using namespace trygvis::cassandra_support;
using namespace trygvis::mqtt_support;
using namespace trygvis::misc_support;
-using namespace boost;
-namespace po = boost::program_options;
static bool should_run;
-static string mqtt_broker_host;
-static auto mqtt_broker_port = 1883;
-static auto queue_name = "/trygvis";
+static string mqtt_host;
+static int mqtt_port;
+static string raw_subscription;
+static string sm_topic;
+static int sm_qos;
+static bool sm_retain;
+
static string keyspace_name = "soil_moisture";
+class sm_mqtt_client;
static unique_ptr<cassandra_session> current_cassandra_session;
+static unique_ptr<sm_mqtt_client> mqtt_client;
struct sensor_measurement {
int sensor;
@@ -57,22 +60,36 @@ struct device_measurement {
}
};
-class mqtt_client : private mosqpp::mosquittopp {
+class sm_mqtt_client : private mosqpp::mosquittopp {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
- mqtt_client(callback_t on_message_) : mosquittopp(),
- on_message_(on_message_) {
- cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl;
+ sm_mqtt_client(callback_t on_message_) : mosquittopp(),
+ on_message_(on_message_) {
+ cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
loop_start();
- connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10);
+ connect_async(mqtt_host.c_str(), mqtt_port, 10);
}
- ~mqtt_client() {
+ ~sm_mqtt_client() {
loop_stop(true);
disconnect();
}
+ void send_sm_messages(string device, const device_measurement &measurement) {
+ string device_topic = sm_topic + "/" + device;
+
+ for_each(measurement.sensors.begin(), measurement.sensors.end(), [&](auto sensor) {
+ auto sensor_topic = device_topic + "/" + to_string(sensor.sensor);
+
+ // TODO: iterate over all values
+
+ auto payload = std::to_string(sensor.value);
+ auto topic = sensor_topic + "/value";
+ this->publish(nullptr, topic.c_str(), (int) payload.length(), payload.c_str(), sm_qos, sm_retain);
+ });
+ }
+
private:
callback_t on_message_;
bool subscribed = false;
@@ -83,8 +100,8 @@ private:
int qos = 0;
if (!subscribed) {
subscribed = true;
- cout << "Subscribing..." << endl;
- subscribe(nullptr, queue_name, qos);
+ cout << "Subscribing to " << raw_subscription << endl;
+ subscribe(nullptr, raw_subscription.c_str(), qos);
}
}
@@ -119,17 +136,17 @@ private:
}
};
-auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
- cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
+cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_measurement &&measurement) {
+ cassandra_statement stmt("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
- q.bind(0, measurement.device);
+ stmt.bind(0, measurement.device);
std::time_t t = measurement.timestamp;
char day[100];
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
- q.bind(1, day);
+ stmt.bind(1, day);
- q.bind(2, measurement.timestamp * 1000);
+ stmt.bind(2, measurement.timestamp * 1000);
cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
@@ -139,14 +156,14 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen
sensors.append(std::move(tuple));
});
- q.bind(3, sensors);
+ stmt.bind(3, sensors);
- return cass_session_execute(session, q);
+ return session->execute2(std::move(stmt));
}
void on_message(const struct mosquitto_message *message) {
string payload((const char *) message->payload, (size_t) message->payloadlen);
- cout << "storing message: " << endl;
+ cout << "Got message on " << message->topic << ":" << endl;
cout << payload << endl;
cout << "----------------------------------------" << endl;
@@ -162,7 +179,7 @@ void on_message(const struct mosquitto_message *message) {
cout << "sample_buffer->samples: " << sample_buffer->samples.size() << endl;
auto device_key = dict.indexOf("device");
- auto timestamp_key = dict.indexOf("timestamp");
+ auto timestamp_key = dict.indexOf("timestamp_ms");
std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
cout << "Sample: " << sample.to_string() << endl;
@@ -172,10 +189,12 @@ void on_message(const struct mosquitto_message *message) {
if (!deviceO) {
cout << "Missing required key 'device'" << endl;
+ return;
}
if (!timestampS) {
cout << "Missing required key 'timestamp'" << endl;
+ return;
}
auto device = deviceO.get();
@@ -183,6 +202,7 @@ void on_message(const struct mosquitto_message *message) {
auto timestamp = flat_map(timestampS, l_c<long>);
if (!timestamp) {
cout << "Invalid value for 'timestamp'" << endl;
+ return;
}
vector<sensor_measurement> sensors;
@@ -208,24 +228,33 @@ void on_message(const struct mosquitto_message *message) {
cout << "Measurement: " << measurement.str() << endl;
if (current_cassandra_session) {
- handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) {
- cout << "Success!" << endl;
- }, [&](auto future, auto err) {
- cout << "Failure: " << cassandra_future::error_message(future) << endl;
+ insert_into_sm_by_day(current_cassandra_session.get(), std::move(measurement))->then([&](auto &f) {
+ if (f) {
+ cout << "INSERT INTO sm_by_day OK" << endl;
+ } else {
+ cout << "INSERT INTO sm_by_day failed: " << f.error_message() << endl;
+ }
});
} else {
cout << "Not connected to Cassandra" << endl;
}
+
+ mqtt_client->send_sm_messages(device, measurement);
});
}
-mqtt_lib mqtt_lib;
-
int main(int argc, const char **argv) {
+ mqtt_lib mqtt_lib;
+ cassandra_logging cassandra_logging;
+
string cassandra_cluster;
po::options_description all("Options");
+ all.add_options()("help", "Show command options");
all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
- all.add_options()("mqtt-broker-host", po::value<string>(&mqtt_broker_host)->default_value("trygvis.io"));
+ all.add_options()("mqtt-host", po::value<>(&mqtt_host)->default_value("trygvis.io"));
+ all.add_options()("mqtt-port", po::value<>(&mqtt_port)->default_value(1883));
+ all.add_options()("raw-subscription", po::value<>(&raw_subscription)->default_value("/soil-moisture/raw/#"));
+ all.add_options()("sm-topic", po::value<>(&sm_topic)->default_value("/soil-moisture/device"));
po::variables_map vm;
try {
@@ -235,12 +264,13 @@ int main(int argc, const char **argv) {
auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
if (vm.count("help")) {
- cerr << all << "\n";
+ cerr << all << endl;
return EXIT_FAILURE;
}
if (unrecognized.size()) {
- cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ cerr << "Unrecognized option: " << unrecognized.at(0) << endl;
+ cerr << all << endl;
return EXIT_FAILURE;
}
@@ -252,14 +282,16 @@ int main(int argc, const char **argv) {
return EXIT_FAILURE;
}
- mqtt_client mqtt_client(on_message);
+ mqtt_client = make_unique<sm_mqtt_client>(on_message);
+
CassFuture *connect_future = nullptr;
CassCluster *cluster = cass_cluster_new();
auto session = make_unique<cassandra_session>();
+ cout << "Connecting to cassandra cluster: " << cassandra_cluster << endl;
cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
- connect_future = cass_session_connect((CassSession*) session.get(), cluster);
+ connect_future = session->connect(cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
string s = cassandra_future::error_message(connect_future);
@@ -267,11 +299,11 @@ int main(int argc, const char **argv) {
return EXIT_FAILURE;
}
+ execute_query(session->underlying(), "USE " + keyspace_name);
+
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
- execute_query((CassSession*) current_cassandra_session.get(), "USE " + keyspace_name);
-
should_run = true;
while (should_run) {
cout << "sleeping.." << endl;