diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-07 17:39:23 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-08-07 17:39:23 +0200 |
commit | d2caf82ad16f8d31db6afdd69383ba1c04e02c32 (patch) | |
tree | f50b6625b91cee915631a5841330d812e6321882 /raw-mqtt-consumer.cpp | |
parent | 981ba5c5f30549dbd233fab1e52bb774fd3eebe1 (diff) | |
download | mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.gz mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.bz2 mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.xz mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.zip |
o More explicit bind functions, it is useful because Cassandra is touchy with its types.
o Saving the topic and message id in the raw record table.
o Dropping the mosquittopp, it is not very useful.
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r-- | raw-mqtt-consumer.cpp | 98 |
1 files changed, 70 insertions, 28 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp index 04453e6..3cc43be 100644 --- a/raw-mqtt-consumer.cpp +++ b/raw-mqtt-consumer.cpp @@ -23,89 +23,130 @@ static string keyspace_name = "soil_moisture"; static unique_ptr<cassandra_session> current_cassandra_session; -class raw_mqtt_client : private mosqpp::mosquittopp { +class raw_mqtt_client /*: private mosqpp::mosquittopp*/ { public: typedef std::function<void(const struct mosquitto_message *)> callback_t; - raw_mqtt_client(callback_t on_message_) : mosquittopp(), - on_message_(on_message_) { + raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) { cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl; - loop_start(); - connect_async(mqtt_host.c_str(), mqtt_port, 10); + + mosquitto = mosquitto_new(nullptr, true, this); + if(!mosquitto) { + throw runtime_error("Could not initialize mosquitto instance"); + } + mosquitto_connect_callback_set(mosquitto, on_connect_cb); + mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb); + mosquitto_publish_callback_set(mosquitto, on_publish_cb); + mosquitto_message_callback_set(mosquitto, on_message_cb); + mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb); + mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb); + mosquitto_log_callback_set(mosquitto, on_log_cb); + + mosquitto_loop_start(mosquitto); + mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10); } ~raw_mqtt_client() { - loop_stop(true); - disconnect(); + mosquitto_loop_stop(mosquitto, true); + mosquitto_disconnect(mosquitto); } private: callback_t on_message_; bool subscribed = false; + struct mosquitto *mosquitto; - void on_connect(int rc) override { + static void on_connect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_connect(rc); + } + + void on_connect(int rc) { cout << "Connected to MQTT broker, rc=" << rc << endl; // should_run = false; int qos = 0; if (!subscribed) { subscribed = true; cout << "Subscribing..." << endl; - subscribe(nullptr, mqtt_topic.c_str(), qos); + mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos); } } - void on_disconnect(int rc) override { + static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_disconnect(rc); + } + + void on_disconnect(int rc) { subscribed = false; - cout << "Oops, disconnected, rc=" << rc << endl; + cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl; + } + + static void on_publish_cb(struct mosquitto *m, void *self, int rc) { + static_cast<raw_mqtt_client *>(self)->on_publish(rc); } - void on_publish(int mid) override { + void on_publish(int mid) { } - void on_message(const struct mosquitto_message *message) override { - string payload((const char *) message->payload, (size_t) message->payloadlen); + static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) { + static_cast<raw_mqtt_client *>(self)->on_message(message); + } + + void on_message(const struct mosquitto_message *message) { on_message_(message); } - void on_subscribe(int mid, int qos_count, const int *granted_qos) override { + static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) { + static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos); + } + + void on_subscribe(int mid, int qos_count, const int *granted_qos) { cout << "Subscribed" << endl; } - void on_unsubscribe(int mid) override { + static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) { + static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid); + } + + void on_unsubscribe(int mid) { cout << "Oops, unsubscribed" << endl; } - void on_log(int level, const char *str) override { - cout << "MQTT: " << level << ":" << str << endl; + static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) { + static_cast<raw_mqtt_client *>(self)->on_log(level, str); } - void on_error() override { - cout << "Oops, error" << endl; + void on_log(int level, const char *str) { + cout << "MQTT: " << level << ":" << str << endl; } }; -cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) { - cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3); +cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const struct mosquitto_message *message, + const SampleRecord &record) { + cassandra_statement q("INSERT INTO raw_record(day, timestamp, mid, topic, qos, records) VALUES (?, ?, ?, ?, ?, ?);", 6); auto system_now = system_clock::now(); char day[100]; std::time_t t = system_clock::to_time_t(system_now); std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); - q.bind(0, day); + q.bind_string(0, day); cout << "day=" << day << endl; auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now); auto timestamp = now_ms.time_since_epoch().count(); - q.bind(1, timestamp); + q.bind_int64(1, timestamp); cout << "timestamp=" << timestamp << endl; + q.bind_int64(2, message->mid); + q.bind_string(3, message->topic); + q.bind_int32(4, (cass_int32_t)message->qos); + auto buf = make_shared<stringstream>(); auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE); output->write(record); - cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size()); + cassandra_collection records(CASS_COLLECTION_TYPE_LIST, record.dict.size()); for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) { cassandra_tuple tuple(2); tuple.set(0, key->name); @@ -113,10 +154,10 @@ cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const if (value) { tuple.set(1, value.get()); } - c.append(std::move(tuple)); + records.append(std::move(tuple)); }); - q.bind(2, c); + q.bind_collection(5, records); return session->execute2(std::move(q)); } @@ -142,7 +183,7 @@ void on_message(const struct mosquitto_message *message) { std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) { cout << "Sample: " << sample.to_string() << endl; - insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) { + insert_into_raw(current_cassandra_session, message, sample)->then([&](auto &f) { if (f) cout << "Success!" << endl; else { @@ -215,6 +256,7 @@ int main(int argc, const char **argv) { cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); + cout << "Connecting to MQTT broker" << endl; raw_mqtt_client mqtt_client(on_message); should_run = true; |