From 06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 2 Aug 2015 16:27:10 +0200 Subject: o Adding a MQTT consumer that stores the parsed record in Cassandra. --- cassandra_support.h | 221 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 196 insertions(+), 25 deletions(-) (limited to 'cassandra_support.h') diff --git a/cassandra_support.h b/cassandra_support.h index 7b50296..3f49be2 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -1,14 +1,18 @@ #ifndef TRYGVIS_CASSANDRA_SUPPORT_H #define TRYGVIS_CASSANDRA_SUPPORT_H -#include +#include + #include +#include #include -#include #include #include #include #include +#include +#include +#include namespace trygvis { namespace cassandra_support { @@ -17,7 +21,7 @@ using namespace std; class cassandra_error : public runtime_error { public: - cassandra_error(const string &context, string &&error) : runtime_error( + cassandra_error(const string &context, const string &error) : runtime_error( "Cassandra error: context=" + context + ", error=" + error) { } @@ -26,14 +30,7 @@ public: } }; -string error_message(CassFuture *future) { - const char *message; - size_t message_length; - cass_future_error_message(future, &message, &message_length); - return string(message, message_length); -} - -static CassError execute_query(CassSession *session, const string &&query) { +static CassError execute_query(CassSession *session, const string query) { CassStatement *statement = cass_statement_new(query.c_str(), 0); CassFuture *future = cass_session_execute(session, statement); @@ -82,7 +79,7 @@ protected: cassandra_wrapper &operator=(const cassandra_wrapper &) = delete; virtual ~cassandra_wrapper() { - underlying_ = (Underlying *)0xaabbccdd; + underlying_ = reinterpret_cast(0xaabbccdd); } Underlying *underlying_; @@ -107,18 +104,155 @@ public: cassandra_result &operator=(const cassandra_result &) = delete; }; -static const CassResult *r; +class cassandra_future2 { +public: + // TODO: the values shouldn't be the future, but a specific result object instead + typedef void(callback_t)(cassandra_future2 &); + + static + cassandra_future2 *wrap(CassFuture *future) { + return new cassandra_future2(future); + } + +private: + typedef std::lock_guard guard; + + callback_t *callback_ = nullptr; + CassFuture *future; + bool has_callback = false; + bool has_data = false; + + std::mutex mutex; + + cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) { + cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this << + ", future=" << std::hex << std::setw(8) << future << endl; + cass_future_set_callback(future, callback, this); + } + + static void default_callback(cassandra_future2 &) { + cout << "default_callback" << endl; + } + + ~cassandra_future2() { + cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl; +// cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl; +// cass_future_free(underlying_); + } + + cassandra_future2(const cassandra_future2 &) = delete; + + cassandra_future2 &operator=(const cassandra_future2 &) = delete; + +public: + void then(callback_t callback_) { + bool do_delete = false; + + { + guard lock(mutex); + + assert(callback_ != nullptr); + this->callback_ = callback_; + has_callback = true; + + if (has_data) { + cout << "Had early data" << endl; + this->callback(future); + + cout << "freeing future: " << std::hex << std::setw(8) << future << endl; + cass_future_free(future); + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } + + bool fetched = false; + + cassandra_result result() { + if (fetched) { + throw cassandra_error("cassandra_result::result()", "Already fetched"); + } + fetched = true; + const CassResult *x = cass_future_get_result(future); + size_t count = cass_result_row_count(x); + return std::move(cassandra_result(x)); + } + + static + string error_message(CassFuture * future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + return string(message, message_length); + } + + string error_message() { + return error_message(future); + } + + CassError error_code() const { + return cass_future_error_code(future); + } -class cassandra_future; + bool operator!() const { + return !ok(); + } -typedef std::function callback_type; + operator bool() const { + return ok(); + } -struct tmp { - callback_type cb; + bool ok() const { + return error_code() == CASS_OK; + } + +private: + + static void callback(CassFuture *f, void *data) { + cassandra_future2 *c_f = static_cast(data); + c_f->callback(f); + } + + void callback(CassFuture *future) { + bool do_delete = false; + + { + guard lock(mutex); + + cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl; + cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl; + CassError rc = cass_future_error_code(future); + + has_data = true; + if (has_callback) { + cout << "had callback already" << endl; + callback_(*this); + + cout << "freeing future: " << std::hex << std::setw(8) << future << endl; + cass_future_free(future); + + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } }; class cassandra_future : public cassandra_wrapper { public: + typedef std::function callback_type; + + struct tmp { + cassandra_future::callback_type cb; + }; + cassandra_future(CassFuture *future) : cassandra_wrapper(future) { } @@ -140,11 +274,13 @@ public: return std::move(cassandra_result(x)); } + static + string error_message(CassFuture *future) { + return cassandra_future2::error_message(future); + } + string error_message() { - const char *message; - size_t message_length; - cass_future_error_message(underlying(), &message, &message_length); - return string(message, message_length); + return error_message(underlying()); } CassError error_code() const { @@ -185,10 +321,14 @@ public: cassandra_tuple &operator=(const cassandra_tuple &) = delete; - void set(size_t i, cass_int32_t value) { + void set(size_t i, const cass_int32_t value) { cass_tuple_set_int32(tuple, i, value); } + void set(size_t i, const string &s) { + cass_tuple_set_string_n(tuple, i, s.c_str(), s.length()); + } + CassTuple *tuple; }; @@ -317,10 +457,19 @@ public: cassandra_session &operator=(const cassandra_session &) = delete; - void execute(cassandra_statement &&stmt, callback_type cb_) { - auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0); + auto connect(CassCluster *cluster) { + return cass_session_connect(session, cluster); + } + + [[deprecated]] + void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) { + auto future = cass_session_execute(session, stmt.underlying()); + cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_}); + } + + cassandra_future2 *execute2(cassandra_statement &&stmt) { auto future = cass_session_execute(session, stmt.underlying()); - cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_}); + return cassandra_future2::wrap(future); } operator CassSession *() const { @@ -335,6 +484,28 @@ private: CassSession *session; }; +class cassandra_logging { +public: + cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) { + cass_log_set_level(log_level); + cass_log_set_callback(on_log, this); + } + + ~cassandra_logging() { + } + +private: + static + void on_log(const CassLogMessage *message, void *data) { + stringstream buf; + buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " << + message->file << ":" << message->function << ":" << + message->line << ":" << message->message; + + cout << "CASSANDRA: " << buf.str() << endl; + } +}; + } } -- cgit v1.2.3