aboutsummaryrefslogtreecommitdiff
path: root/main.cpp
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-14 01:04:41 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-14 01:04:41 +0200
commit643d2aaf8d5617487c26ba4d02af65dfcd3e0d88 (patch)
tree7e6be55672de7c45ae85f86863372dd94fc7605a /main.cpp
parentf2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (diff)
downloadmqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.gz
mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.bz2
mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.xz
mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.zip
o Adding web server to serve responses.
Diffstat (limited to 'main.cpp')
-rw-r--r--main.cpp39
1 files changed, 10 insertions, 29 deletions
diff --git a/main.cpp b/main.cpp
index 744f697..d7dbfdc 100644
--- a/main.cpp
+++ b/main.cpp
@@ -136,39 +136,29 @@ void print_error(CassFuture *future) {
cout << "Cassandra error: " << error_message(future) << endl;
}
-/*
- CREATE TABLE sm_by_day (
- device text,
- day text,
- timestamp timestamp,
- sensors list<frozen<tuple<int, int>>>,
- PRIMARY KEY ((device, day), timestamp)
- )
- */
-CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
+auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
q.bind(0, measurement.device);
- std::time_t t = std::time(NULL);
+ std::time_t t = measurement.timestamp;
char day[100];
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
q.bind(1, day);
- auto timestamp = std::time(NULL);
- q.bind(2, timestamp);
+ q.bind(2, measurement.timestamp * 1000);
cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
cassandra_tuple tuple(2);
tuple.set(0, sensor.sensor);
tuple.set(1, sensor.value);
- sensors.append_tuple(std::move(tuple));
+ sensors.append(std::move(tuple));
});
q.bind(3, sensors);
- return wait_for_future(cass_session_execute(session, q.statement));
+ return cass_session_execute(session, q.statement);
}
template<typename Target, typename Source>
@@ -189,15 +179,6 @@ boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Tar
return f(a.get());
}
-//template<typename Target, typename Source>
-//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) {
-// if (!a.is_initialized()) {
-// return boost::none;
-// }
-//
-// return f(a.get());
-//}
-
template<typename Target, typename Source = string>
boost::optional<Target> l_c(const Source source) {
try {
@@ -271,11 +252,11 @@ void on_message(const struct mosquitto_message *message) {
cout << "Measurement: " << measurement.str() << endl;
if (current_cassandra_session) {
- auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement));
-
- cout << "rc=" << rc << endl;
-
- assert_ok("wait_for_future", rc);
+ handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) {
+ cout << "Success!" << endl;
+ }, [&](auto future, auto err) {
+ cout << "Failure: " << error_message(future) << endl;
+ });
} else {
cout << "Not connected to Cassandra" << endl;
}