diff options
-rw-r--r-- | cassandra_support.h | 17 | ||||
-rw-r--r-- | mqtt_support.h | 1 | ||||
-rw-r--r-- | raw-mqtt-consumer.cpp | 98 | ||||
-rw-r--r-- | sm-http-server.cpp | 4 | ||||
-rw-r--r-- | sm-mqtt-consumer.cpp | 8 | ||||
-rw-r--r-- | trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql | 8 |
6 files changed, 96 insertions, 40 deletions
diff --git a/cassandra_support.h b/cassandra_support.h index 3f49be2..2e3ea19 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -395,32 +395,37 @@ public: cassandra_statement &operator=(const cassandra_statement &) = delete; - void bind(size_t i, const string &value) { + void bind_string(size_t i, const string &value) { auto err = cass_statement_bind_string(statement, i, value.c_str()); assert_ok("cass_statement_bind_string", err); } - void bind(size_t i, const char *value) { + void bind_string(size_t i, const char *value) { auto err = cass_statement_bind_string(statement, i, value); assert_ok("cass_statement_bind_string", err); } - void bind(size_t i, const cass_int64_t value) { + void bind_int32(size_t i, const cass_int32_t value) { + auto err = cass_statement_bind_int32(statement, i, value); + assert_ok("cass_statement_bind_int32", err); + } + + void bind_int64(size_t i, const cass_int64_t value) { auto err = cass_statement_bind_int64(statement, i, value); assert_ok("cass_statement_bind_int64", err); } - void bind(size_t i, const CassCollection *value) { + void bind_collection(size_t i, const CassCollection *value) { auto err = cass_statement_bind_collection(statement, i, value); assert_ok("cass_statement_bind_collection", err); } - void bind(size_t i, const cassandra_collection &value) { + void bind_collection(size_t i, const cassandra_collection &value) { auto err = cass_statement_bind_collection(statement, i, value); assert_ok("cass_statement_bind_collection", err); } - void bind(size_t i, const std::vector<string> &&values) { + void bind_list(size_t i, const std::vector<string> &&values) { cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size()); for (const auto &value : values) { diff --git a/mqtt_support.h b/mqtt_support.h index 9a9f7fe..2ed5134 100644 --- a/mqtt_support.h +++ b/mqtt_support.h @@ -2,6 +2,7 @@ #define TRYGVIS_MQTT_SUPPORT_H #include "mosquittopp.h" +#include "mosquitto.h" namespace trygvis { namespace mqtt_support { diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp index 04453e6..3cc43be 100644 --- a/raw-mqtt-consumer.cpp +++ b/raw-mqtt-consumer.cpp @@ -23,89 +23,130 @@ static string keyspace_name = "soil_moisture"; static unique_ptr<cassandra_session> current_cassandra_session; -class raw_mqtt_client : private mosqpp::mosquittopp { +class raw_mqtt_client /*: private mosqpp::mosquittopp*/ { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; - raw_mqtt_client(callback_t on_message_) : mosquittopp(), - on_message_(on_message_) { + raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) { cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; - loop_start(); - connect_async(mqtt_host.c_str(), mqtt_port, 10); + + mosquitto = mosquitto_new(nullptr, true, this); + if(!mosquitto) { + throw runtime_error("Could not initialize mosquitto instance"); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + mosquitto_loop_start(mosquitto); + mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10); } ~raw_mqtt_client() { - loop_stop(true); - disconnect(); + mosquitto_loop_stop(mosquitto, true); + mosquitto_disconnect(mosquitto); } private: callback_t on_message_; bool subscribed = false; + struct mosquitto *mosquitto; - void on_connect(int rc) override { + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_connect(rc); + } + + void on_connect(int rc) { cout << "Connected to MQTT broker, rc=" << rc << endl; // should_run = false; int qos = 0; if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; - subscribe(nullptr, mqtt_topic.c_str(), qos); + mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos); } } - void on_disconnect(int rc) override { + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_disconnect(rc); + } + + void on_disconnect(int rc) { subscribed = false; - cout << "Oops, disconnected, rc=" << rc << endl; + cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl; + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_publish(rc); } - void on_publish(int mid) override { + void on_publish(int mid) { } - void on_message(const struct mosquitto_message *message) override { - string payload((const char *) message->payload, (size_t) message->payloadlen); + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast<raw_mqtt_client *>(self)->on_message(message); + } + + void on_message(const struct mosquitto_message *message) { on_message_(message); } - void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) { cout << "Subscribed" << endl; } - void on_unsubscribe(int mid) override { + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid); + } + + void on_unsubscribe(int mid) { cout << "Oops, unsubscribed" << endl; } - void on_log(int level, const char *str) override { - cout << "MQTT: " << level << ":" << str << endl; + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast<raw_mqtt_client *>(self)->on_log(level, str); } - void on_error() override { - cout << "Oops, error" << endl; + void on_log(int level, const char *str) { + cout << "MQTT: " << level << ":" << str << endl; } }; -cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) { - cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3); +cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const struct mosquitto_message *message, + const SampleRecord &record) { + cassandra_statement q("INSERT INTO raw_record(day, timestamp, mid, topic, qos, records) VALUES (?, ?, ?, ?, ?, ?);", 6); auto system_now = system_clock::now(); char day[100]; std::time_t t = system_clock::to_time_t(system_now); std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); - q.bind(0, day); + q.bind_string(0, day); cout << "day=" << day << endl; auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now); auto timestamp = now_ms.time_since_epoch().count(); - q.bind(1, timestamp); + q.bind_int64(1, timestamp); cout << "timestamp=" << timestamp << endl; + q.bind_int64(2, message->mid); + q.bind_string(3, message->topic); + q.bind_int32(4, (cass_int32_t)message->qos); + auto buf = make_shared<stringstream>(); auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE); output->write(record); - cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size()); + cassandra_collection records(CASS_COLLECTION_TYPE_LIST, record.dict.size()); for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) { cassandra_tuple tuple(2); tuple.set(0, key->name); @@ -113,10 +154,10 @@ cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const if (value) { tuple.set(1, value.get()); } - c.append(std::move(tuple)); + records.append(std::move(tuple)); }); - q.bind(2, c); + q.bind_collection(5, records); return session->execute2(std::move(q)); } @@ -142,7 +183,7 @@ void on_message(const struct mosquitto_message *message) { std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; - insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) { + insert_into_raw(current_cassandra_session, message, sample)->then([&](auto &f) { if (f) cout << "Success!" << endl; else { @@ -215,6 +256,7 @@ int main(int argc, const char **argv) { cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); + cout << "Connecting to MQTT broker" << endl; raw_mqtt_client mqtt_client(on_message); should_run = true; diff --git a/sm-http-server.cpp b/sm-http-server.cpp index 80dd68f..e567655 100644 --- a/sm-http-server.cpp +++ b/sm-http-server.cpp @@ -59,10 +59,10 @@ void handle_device_get(const request &req, const response &res, string device) { cout << "handle_device_get(" << device << ");" << endl; cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); - stmt.bind(0, device); + stmt.bind_string(0, device); vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; - stmt.bind(1, std::move(days)); + stmt.bind_list(1, std::move(days)); current_cassandra_session->execute(std::move(stmt), [&](cassandra_future& future) { const cassandra_result result = future.result(); diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index 5a0a23e..778eddf 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -137,14 +137,14 @@ private: cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_measurement &&measurement) { cassandra_statement stmt("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); - stmt.bind(0, measurement.device); + stmt.bind_string(0, measurement.device); std::time_t t = system_clock::to_time_t(measurement.timestamp); char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); - stmt.bind(1, day); + stmt.bind_string(1, day); - stmt.bind(2, measurement.timestamp.time_since_epoch().count()); + stmt.bind_int64(2, measurement.timestamp.time_since_epoch().count()); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { @@ -154,7 +154,7 @@ cassandra_future2* insert_into_sm_by_day(cassandra_session *session, device_meas sensors.append(std::move(tuple)); }); - stmt.bind(3, sensors); + stmt.bind_collection(3, sensors); return session->execute2(std::move(stmt)); } diff --git a/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql b/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql new file mode 100644 index 0000000..3076add --- /dev/null +++ b/trireme/db/migrations/2015-08-04_2_more_details_to_raw.cql @@ -0,0 +1,8 @@ +ALTER TABLE raw_record +ADD mid bigint; + +ALTER TABLE raw_record +ADD topic TEXT; + +ALTER TABLE raw_record +ADD qos int; |