diff options
Diffstat (limited to 'main.cpp')
-rw-r--r-- | main.cpp | 39 |
1 files changed, 10 insertions, 29 deletions
@@ -136,39 +136,29 @@ void print_error(CassFuture *future) { cout << "Cassandra error: " << error_message(future) << endl; } -/* - CREATE TABLE sm_by_day ( - device text, - day text, - timestamp timestamp, - sensors list<frozen<tuple<int, int>>>, - PRIMARY KEY ((device, day), timestamp) - ) - */ -CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { +auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); q.bind(0, measurement.device); - std::time_t t = std::time(NULL); + std::time_t t = measurement.timestamp; char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); q.bind(1, day); - auto timestamp = std::time(NULL); - q.bind(2, timestamp); + q.bind(2, measurement.timestamp * 1000); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { cassandra_tuple tuple(2); tuple.set(0, sensor.sensor); tuple.set(1, sensor.value); - sensors.append_tuple(std::move(tuple)); + sensors.append(std::move(tuple)); }); q.bind(3, sensors); - return wait_for_future(cass_session_execute(session, q.statement)); + return cass_session_execute(session, q.statement); } template<typename Target, typename Source> @@ -189,15 +179,6 @@ boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Tar return f(a.get()); } -//template<typename Target, typename Source> -//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) { -// if (!a.is_initialized()) { -// return boost::none; -// } -// -// return f(a.get()); -//} - template<typename Target, typename Source = string> boost::optional<Target> l_c(const Source source) { try { @@ -271,11 +252,11 @@ void on_message(const struct mosquitto_message *message) { cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { - auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)); - - cout << "rc=" << rc << endl; - - assert_ok("wait_for_future", rc); + handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + cout << "Success!" << endl; + }, [&](auto future, auto err) { + cout << "Failure: " << error_message(future) << endl; + }); } else { cout << "Not connected to Cassandra" << endl; } |