diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-14 01:04:41 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-14 01:04:41 +0200 |
commit | 643d2aaf8d5617487c26ba4d02af65dfcd3e0d88 (patch) | |
tree | 7e6be55672de7c45ae85f86863372dd94fc7605a /main.cpp | |
parent | f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (diff) | |
download | mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.gz mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.bz2 mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.xz mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.zip |
o Adding web server to serve responses.
Diffstat (limited to 'main.cpp')
-rw-r--r-- | main.cpp | 39 |
1 files changed, 10 insertions, 29 deletions
@@ -136,39 +136,29 @@ void print_error(CassFuture *future) { cout << "Cassandra error: " << error_message(future) << endl; } -/* - CREATE TABLE sm_by_day ( - device text, - day text, - timestamp timestamp, - sensors list<frozen<tuple<int, int>>>, - PRIMARY KEY ((device, day), timestamp) - ) - */ -CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { +auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); q.bind(0, measurement.device); - std::time_t t = std::time(NULL); + std::time_t t = measurement.timestamp; char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); q.bind(1, day); - auto timestamp = std::time(NULL); - q.bind(2, timestamp); + q.bind(2, measurement.timestamp * 1000); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { cassandra_tuple tuple(2); tuple.set(0, sensor.sensor); tuple.set(1, sensor.value); - sensors.append_tuple(std::move(tuple)); + sensors.append(std::move(tuple)); }); q.bind(3, sensors); - return wait_for_future(cass_session_execute(session, q.statement)); + return cass_session_execute(session, q.statement); } template<typename Target, typename Source> @@ -189,15 +179,6 @@ boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Tar return f(a.get()); } -//template<typename Target, typename Source> -//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) { -// if (!a.is_initialized()) { -// return boost::none; -// } -// -// return f(a.get()); -//} - template<typename Target, typename Source = string> boost::optional<Target> l_c(const Source source) { try { @@ -271,11 +252,11 @@ void on_message(const struct mosquitto_message *message) { cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { - auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)); - - cout << "rc=" << rc << endl; - - assert_ok("wait_for_future", rc); + handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + cout << "Success!" << endl; + }, [&](auto future, auto err) { + cout << "Failure: " << error_message(future) << endl; + }); } else { cout << "Not connected to Cassandra" << endl; } |