aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cassandra_support.h17
-rw-r--r--mqtt_support.h1
-rw-r--r--raw-mqtt-consumer.cpp98
-rw-r--r--sm-http-server.cpp4
-rw-r--r--sm-mqtt-consumer.cpp8
-rw-r--r--trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql8
6 files changed, 96 insertions, 40 deletions
diff --git a/cassandra_support.h b/cassandra_support.h
index 3f49be2..2e3ea19 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -395,32 +395,37 @@ public:
cassandra_statement &operator=(const cassandra_statement &) = delete;
- void bind(size_t i, const string &value) {
+ void bind_string(size_t i, const string &value) {
auto err = cass_statement_bind_string(statement, i, value.c_str());
assert_ok("cass_statement_bind_string", err);
}
- void bind(size_t i, const char *value) {
+ void bind_string(size_t i, const char *value) {
auto err = cass_statement_bind_string(statement, i, value);
assert_ok("cass_statement_bind_string", err);
}
- void bind(size_t i, const cass_int64_t value) {
+ void bind_int32(size_t i, const cass_int32_t value) {
+ auto err = cass_statement_bind_int32(statement, i, value);
+ assert_ok("cass_statement_bind_int32", err);
+ }
+
+ void bind_int64(size_t i, const cass_int64_t value) {
auto err = cass_statement_bind_int64(statement, i, value);
assert_ok("cass_statement_bind_int64", err);
}
- void bind(size_t i, const CassCollection *value) {
+ void bind_collection(size_t i, const CassCollection *value) {
auto err = cass_statement_bind_collection(statement, i, value);
assert_ok("cass_statement_bind_collection", err);
}
- void bind(size_t i, const cassandra_collection &value) {
+ void bind_collection(size_t i, const cassandra_collection &value) {
auto err = cass_statement_bind_collection(statement, i, value);
assert_ok("cass_statement_bind_collection", err);
}
- void bind(size_t i, const std::vector<string> &&values) {
+ void bind_list(size_t i, const std::vector<string> &&values) {
cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size());
for (const auto &value : values) {
diff --git a/mqtt_support.h b/mqtt_support.h
index 9a9f7fe..2ed5134 100644
--- a/mqtt_support.h
+++ b/mqtt_support.h
@@ -2,6 +2,7 @@
#define TRYGVIS_MQTT_SUPPORT_H
#include "mosquittopp.h"
+#include "mosquitto.h"
namespace trygvis {
namespace mqtt_support {
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
index 04453e6..3cc43be 100644
--- a/raw-mqtt-consumer.cpp
+++ b/raw-mqtt-consumer.cpp
@@ -23,89 +23,130 @@ static string keyspace_name = "soil_moisture";
static unique_ptr<cassandra_session> current_cassandra_session;
-class raw_mqtt_client : private mosqpp::mosquittopp {
+class raw_mqtt_client /*: private mosqpp::mosquittopp*/ {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
- raw_mqtt_client(callback_t on_message_) : mosquittopp(),
- on_message_(on_message_) {
+ raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) {
cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
- loop_start();
- connect_async(mqtt_host.c_str(), mqtt_port, 10);
+
+ mosquitto = mosquitto_new(nullptr, true, this);
+ if(!mosquitto) {
+ throw runtime_error("Could not initialize mosquitto instance");
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ mosquitto_loop_start(mosquitto);
+ mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10);
}
~raw_mqtt_client() {
- loop_stop(true);
- disconnect();
+ mosquitto_loop_stop(mosquitto, true);
+ mosquitto_disconnect(mosquitto);
}
private:
callback_t on_message_;
bool subscribed = false;
+ struct mosquitto *mosquitto;
- void on_connect(int rc) override {
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_connect(rc);
+ }
+
+ void on_connect(int rc) {
cout << "Connected to MQTT broker, rc=" << rc << endl;
// should_run = false;
int qos = 0;
if (!subscribed) {
subscribed = true;
cout << "Subscribing..." << endl;
- subscribe(nullptr, mqtt_topic.c_str(), qos);
+ mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos);
}
}
- void on_disconnect(int rc) override {
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_disconnect(rc);
+ }
+
+ void on_disconnect(int rc) {
subscribed = false;
- cout << "Oops, disconnected, rc=" << rc << endl;
+ cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl;
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_publish(rc);
}
- void on_publish(int mid) override {
+ void on_publish(int mid) {
}
- void on_message(const struct mosquitto_message *message) override {
- string payload((const char *) message->payload, (size_t) message->payloadlen);
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<raw_mqtt_client *>(self)->on_message(message);
+ }
+
+ void on_message(const struct mosquitto_message *message) {
on_message_(message);
}
- void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) {
cout << "Subscribed" << endl;
}
- void on_unsubscribe(int mid) override {
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid);
+ }
+
+ void on_unsubscribe(int mid) {
cout << "Oops, unsubscribed" << endl;
}
- void on_log(int level, const char *str) override {
- cout << "MQTT: " << level << ":" << str << endl;
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<raw_mqtt_client *>(self)->on_log(level, str);
}
- void on_error() override {
- cout << "Oops, error" << endl;
+ void on_log(int level, const char *str) {
+ cout << "MQTT: " << level << ":" << str << endl;
}
};
-cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) {
- cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3);
+cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const struct mosquitto_message *message,
+ const SampleRecord &record) {
+ cassandra_statement q("INSERT INTO raw_record(day, timestamp, mid, topic, qos, records) VALUES (?, ?, ?, ?, ?, ?);", 6);
auto system_now = system_clock::now();
char day[100];
std::time_t t = system_clock::to_time_t(system_now);
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
- q.bind(0, day);
+ q.bind_string(0, day);
cout << "day=" << day << endl;
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now);
auto timestamp = now_ms.time_since_epoch().count();
- q.bind(1, timestamp);
+ q.bind_int64(1, timestamp);
cout << "timestamp=" << timestamp << endl;
+ q.bind_int64(2, message->mid);
+ q.bind_string(3, message->topic);
+ q.bind_int32(4, (cass_int32_t)message->qos);
+
auto buf = make_shared<stringstream>();
auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE);
output->write(record);
- cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size());
+ cassandra_collection records(CASS_COLLECTION_TYPE_LIST, record.dict.size());
for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) {
cassandra_tuple tuple(2);
tuple.set(0, key->name);
@@ -113,10 +154,10 @@ cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const
if (value) {
tuple.set(1, value.get());
}
- c.append(std::move(tuple));
+ records.append(std::move(tuple));
});
- q.bind(2, c);
+ q.bind_collection(5, records);
return session->execute2(std::move(q));
}
@@ -142,7 +183,7 @@ void on_message(const struct mosquitto_message *message) {
std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
cout << "Sample: " << sample.to_string() << endl;
- insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) {
+ insert_into_raw(current_cassandra_session, message, sample)->then([&](auto &f) {
if (f)
cout << "Success!" << endl;
else {
@@ -215,6 +256,7 @@ int main(int argc, const char **argv) {
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
+ cout << "Connecting to MQTT broker" << endl;
raw_mqtt_client mqtt_client(on_message);
should_run = true;
diff --git a/sm-http-server.cpp b/sm-http-server.cpp
index 80dd68f..e567655 100644
--- a/sm-http-server.cpp
+++ b/sm-http-server.cpp
@@ -59,10 +59,10 @@ void handle_device_get(const request &req, const response &res, string device) {
cout << "handle_device_get(" << device << ");" << endl;
cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2);
- stmt.bind(0, device);
+ stmt.bind_string(0, device);
vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15",
"2015-07-16"};
- stmt.bind(1, std::move(days));
+ stmt.bind_list(1, std::move(days));
current_cassandra_session->execute(std::move(stmt), [&](cassandra_future& future) {
const cassandra_result result = future.result();
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index 5a0a23e..778eddf 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -137,14 +137,14 @@ private:
cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_measurement &&measurement) {
cassandra_statement stmt("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
- stmt.bind(0, measurement.device);
+ stmt.bind_string(0, measurement.device);
std::time_t t = system_clock::to_time_t(measurement.timestamp);
char day[100];
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
- stmt.bind(1, day);
+ stmt.bind_string(1, day);
- stmt.bind(2, measurement.timestamp.time_since_epoch().count());
+ stmt.bind_int64(2, measurement.timestamp.time_since_epoch().count());
cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
@@ -154,7 +154,7 @@ cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_meas
sensors.append(std::move(tuple));
});
- stmt.bind(3, sensors);
+ stmt.bind_collection(3, sensors);
return session->execute2(std::move(stmt));
}
diff --git a/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql b/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql
new file mode 100644
index 0000000..3076add
--- /dev/null
+++ b/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql
@@ -0,0 +1,8 @@
+ALTER TABLE raw_record
+ADD mid bigint;
+
+ALTER TABLE raw_record
+ADD topic TEXT;
+
+ALTER TABLE raw_record
+ADD qos int;