aboutsummaryrefslogtreecommitdiff
path: root/sm-mqtt-consumer.cpp
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
commit06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch)
treebc272faec6c585693341a814c65483e86c56255b /sm-mqtt-consumer.cpp
parentb632036b153297f83b10f6d960ccfe0c1772f00e (diff)
downloadmqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip
o Adding a MQTT consumer that stores the parsed record in Cassandra.
Diffstat (limited to 'sm-mqtt-consumer.cpp')
-rw-r--r--sm-mqtt-consumer.cpp78
1 files changed, 20 insertions, 58 deletions
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index e1f6801..b91b663 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -1,15 +1,19 @@
#include "cassandra_support.h"
-#include "mosquittopp.h"
-#include "trygvis/sensor/io.h"
+#include "mqtt_support.h"
+#include "misc_support.h"
#include <thread>
-#include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp>
+#include <trygvis/sensor/io.h>
+
+namespace sm_mqtt_consumer {
using namespace std;
using namespace std::chrono;
using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
using namespace trygvis::cassandra_support;
+using namespace trygvis::mqtt_support;
+using namespace trygvis::misc_support;
using namespace boost;
namespace po = boost::program_options;
@@ -53,29 +57,12 @@ struct device_measurement {
}
};
-string &&to_string(CassFuture *f) {
- const char *message;
- size_t message_length;
- cass_future_error_message(f, &message, &message_length);
-
- return std::move(string(message, message_length));
-}
-
-class mqtt_lib {
-public:
- mqtt_lib() {
- mosquitto_lib_init();
- }
-
- ~mqtt_lib() {
- mosquitto_lib_cleanup();
- }
-};
-
class mqtt_client : private mosqpp::mosquittopp {
public:
- mqtt_client(std::function<void(const struct mosquitto_message *)> on_message_) : mosquittopp(),
- on_message_(on_message_) {
+ typedef std::function<void(const struct mosquitto_message *)> callback_t;
+
+ mqtt_client(callback_t on_message_) : mosquittopp(),
+ on_message_(on_message_) {
cout << "Connecting to " << mqtt_broker_host << ":" << mqtt_broker_port << endl;
loop_start();
connect_async(mqtt_broker_host.c_str(), mqtt_broker_port, 10);
@@ -87,7 +74,7 @@ public:
}
private:
- std::function<void(const struct mosquitto_message *)> on_message_;
+ callback_t on_message_;
bool subscribed = false;
void on_connect(int rc) override {
@@ -132,10 +119,6 @@ private:
}
};
-void print_error(CassFuture *future) {
- cout << "Cassandra error: " << error_message(future) << endl;
-}
-
auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
@@ -161,33 +144,6 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen
return cass_session_execute(session, q);
}
-template<typename Target, typename Source>
-boost::optional<Target> map(boost::optional<Source> &a, std::function<Target(Source)> f) {
- if (!a.is_initialized()) {
- return boost::none;
- }
-
- return make_optional(f(a));
-}
-
-template<typename Target, typename Source>
-boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Target> (&f)(Source)) {
- if (!a.is_initialized()) {
- return boost::none;
- }
-
- return f(a.get());
-}
-
-template<typename Target, typename Source = string>
-boost::optional<Target> l_c(const Source source) {
- try {
- return boost::lexical_cast<Target>(source);
- } catch (bad_lexical_cast &e) {
- return boost::none;
- }
-};
-
void on_message(const struct mosquitto_message *message) {
string payload((const char *) message->payload, (size_t) message->payloadlen);
cout << "storing message: " << endl;
@@ -255,7 +211,7 @@ void on_message(const struct mosquitto_message *message) {
handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) {
cout << "Success!" << endl;
}, [&](auto future, auto err) {
- cout << "Failure: " << error_message(future) << endl;
+ cout << "Failure: " << cassandra_future::error_message(future) << endl;
});
} else {
cout << "Not connected to Cassandra" << endl;
@@ -306,7 +262,7 @@ int main(int argc, const char **argv) {
connect_future = cass_session_connect((CassSession*) session.get(), cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
- string s = to_string(connect_future);
+ string s = cassandra_future::error_message(connect_future);
cerr << "Could not connect to Cassandra:" << s << endl;
return EXIT_FAILURE;
}
@@ -326,3 +282,9 @@ int main(int argc, const char **argv) {
return 0;
}
+
+}
+
+int main(int argc, const char **argv) {
+ return sm_mqtt_consumer::main(argc, argv);
+}