aboutsummaryrefslogtreecommitdiff
path: root/raw-mqtt-consumer.cpp
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-07 17:39:23 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-07 17:39:23 +0200
commitd2caf82ad16f8d31db6afdd69383ba1c04e02c32 (patch)
treef50b6625b91cee915631a5841330d812e6321882 /raw-mqtt-consumer.cpp
parent981ba5c5f30549dbd233fab1e52bb774fd3eebe1 (diff)
downloadmqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.gz
mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.bz2
mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.tar.xz
mqtt-cassandra-bridge-d2caf82ad16f8d31db6afdd69383ba1c04e02c32.zip
o More explicit bind functions, it is useful because Cassandra is touchy with its types.
o Saving the topic and message id in the raw record table. o Dropping the mosquittopp, it is not very useful.
Diffstat (limited to 'raw-mqtt-consumer.cpp')
-rw-r--r--raw-mqtt-consumer.cpp98
1 files changed, 70 insertions, 28 deletions
diff --git a/raw-mqtt-consumer.cpp b/raw-mqtt-consumer.cpp
index 04453e6..3cc43be 100644
--- a/raw-mqtt-consumer.cpp
+++ b/raw-mqtt-consumer.cpp
@@ -23,89 +23,130 @@ static string keyspace_name = "soil_moisture";
static unique_ptr<cassandra_session> current_cassandra_session;
-class raw_mqtt_client : private mosqpp::mosquittopp {
+class raw_mqtt_client /*: private mosqpp::mosquittopp*/ {
public:
typedef std::function<void(const struct mosquitto_message *)> callback_t;
- raw_mqtt_client(callback_t on_message_) : mosquittopp(),
- on_message_(on_message_) {
+ raw_mqtt_client(callback_t on_message_) : on_message_(on_message_) {
cout << "Connecting to " << mqtt_host << ":" << mqtt_port << endl;
- loop_start();
- connect_async(mqtt_host.c_str(), mqtt_port, 10);
+
+ mosquitto = mosquitto_new(nullptr, true, this);
+ if(!mosquitto) {
+ throw runtime_error("Could not initialize mosquitto instance");
+ }
+ mosquitto_connect_callback_set(mosquitto, on_connect_cb);
+ mosquitto_disconnect_callback_set(mosquitto, on_disconnect_cb);
+ mosquitto_publish_callback_set(mosquitto, on_publish_cb);
+ mosquitto_message_callback_set(mosquitto, on_message_cb);
+ mosquitto_subscribe_callback_set(mosquitto, on_subscribe_cb);
+ mosquitto_unsubscribe_callback_set(mosquitto, on_unsubscribe_cb);
+ mosquitto_log_callback_set(mosquitto, on_log_cb);
+
+ mosquitto_loop_start(mosquitto);
+ mosquitto_connect_async(mosquitto, mqtt_host.c_str(), mqtt_port, 10);
}
~raw_mqtt_client() {
- loop_stop(true);
- disconnect();
+ mosquitto_loop_stop(mosquitto, true);
+ mosquitto_disconnect(mosquitto);
}
private:
callback_t on_message_;
bool subscribed = false;
+ struct mosquitto *mosquitto;
- void on_connect(int rc) override {
+ static void on_connect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_connect(rc);
+ }
+
+ void on_connect(int rc) {
cout << "Connected to MQTT broker, rc=" << rc << endl;
// should_run = false;
int qos = 0;
if (!subscribed) {
subscribed = true;
cout << "Subscribing..." << endl;
- subscribe(nullptr, mqtt_topic.c_str(), qos);
+ mosquitto_subscribe(mosquitto, nullptr, mqtt_topic.c_str(), qos);
}
}
- void on_disconnect(int rc) override {
+ static void on_disconnect_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_disconnect(rc);
+ }
+
+ void on_disconnect(int rc) {
subscribed = false;
- cout << "Oops, disconnected, rc=" << rc << endl;
+ cout << "Oops, disconnected, rc=" << rc << ":" << mosqpp::strerror(rc) << endl;
+ }
+
+ static void on_publish_cb(struct mosquitto *m, void *self, int rc) {
+ static_cast<raw_mqtt_client *>(self)->on_publish(rc);
}
- void on_publish(int mid) override {
+ void on_publish(int mid) {
}
- void on_message(const struct mosquitto_message *message) override {
- string payload((const char *) message->payload, (size_t) message->payloadlen);
+ static void on_message_cb(struct mosquitto *m, void *self, const mosquitto_message *message) {
+ static_cast<raw_mqtt_client *>(self)->on_message(message);
+ }
+
+ void on_message(const struct mosquitto_message *message) {
on_message_(message);
}
- void on_subscribe(int mid, int qos_count, const int *granted_qos) override {
+ static void on_subscribe_cb(struct mosquitto *m, void *self, int mid, int qos_count, const int *granted_qos) {
+ static_cast<raw_mqtt_client *>(self)->on_subscribe(mid, qos_count, granted_qos);
+ }
+
+ void on_subscribe(int mid, int qos_count, const int *granted_qos) {
cout << "Subscribed" << endl;
}
- void on_unsubscribe(int mid) override {
+ static void on_unsubscribe_cb(struct mosquitto *m, void *self, int mid) {
+ static_cast<raw_mqtt_client *>(self)->on_unsubscribe(mid);
+ }
+
+ void on_unsubscribe(int mid) {
cout << "Oops, unsubscribed" << endl;
}
- void on_log(int level, const char *str) override {
- cout << "MQTT: " << level << ":" << str << endl;
+ static void on_log_cb(struct mosquitto *m, void *self, int level, const char *str) {
+ static_cast<raw_mqtt_client *>(self)->on_log(level, str);
}
- void on_error() override {
- cout << "Oops, error" << endl;
+ void on_log(int level, const char *str) {
+ cout << "MQTT: " << level << ":" << str << endl;
}
};
-cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const SampleRecord &record) {
- cassandra_statement q("INSERT INTO raw_record(day, timestamp, records) VALUES (?, ?, ?);", 3);
+cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const struct mosquitto_message *message,
+ const SampleRecord &record) {
+ cassandra_statement q("INSERT INTO raw_record(day, timestamp, mid, topic, qos, records) VALUES (?, ?, ?, ?, ?, ?);", 6);
auto system_now = system_clock::now();
char day[100];
std::time_t t = system_clock::to_time_t(system_now);
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
- q.bind(0, day);
+ q.bind_string(0, day);
cout << "day=" << day << endl;
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(system_now);
auto timestamp = now_ms.time_since_epoch().count();
- q.bind(1, timestamp);
+ q.bind_int64(1, timestamp);
cout << "timestamp=" << timestamp << endl;
+ q.bind_int64(2, message->mid);
+ q.bind_string(3, message->topic);
+ q.bind_int32(4, (cass_int32_t)message->qos);
+
auto buf = make_shared<stringstream>();
auto output = trygvis::sensor::io::open_sample_output_stream(buf, record.dict, sample_format_type::KEY_VALUE);
output->write(record);
- cassandra_collection c(CASS_COLLECTION_TYPE_LIST, record.dict.size());
+ cassandra_collection records(CASS_COLLECTION_TYPE_LIST, record.dict.size());
for_each(record.dict.begin(), record.dict.end(), [&](const SampleKey *key) {
cassandra_tuple tuple(2);
tuple.set(0, key->name);
@@ -113,10 +154,10 @@ cassandra_future2 *insert_into_raw(unique_ptr<cassandra_session> &session, const
if (value) {
tuple.set(1, value.get());
}
- c.append(std::move(tuple));
+ records.append(std::move(tuple));
});
- q.bind(2, c);
+ q.bind_collection(5, records);
return session->execute2(std::move(q));
}
@@ -142,7 +183,7 @@ void on_message(const struct mosquitto_message *message) {
std::for_each(sample_buffer->samples.cbegin(), sample_buffer->samples.cend(), [&](auto &sample) {
cout << "Sample: " << sample.to_string() << endl;
- insert_into_raw(current_cassandra_session, sample)->then([&](auto &f) {
+ insert_into_raw(current_cassandra_session, message, sample)->then([&](auto &f) {
if (f)
cout << "Success!" << endl;
else {
@@ -215,6 +256,7 @@ int main(int argc, const char **argv) {
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
+ cout << "Connecting to MQTT broker" << endl;
raw_mqtt_client mqtt_client(on_message);
should_run = true;