From b632036b153297f83b10f6d960ccfe0c1772f00e Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Thu, 16 Jul 2015 13:43:13 +0200 Subject: o More Cassandra wrappers. o Using more futures. --- cassandra_support.h | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 159 insertions(+), 3 deletions(-) (limited to 'cassandra_support.h') diff --git a/cassandra_support.h b/cassandra_support.h index b0726c8..7b50296 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -17,7 +17,12 @@ using namespace std; class cassandra_error : public runtime_error { public: - cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { + cassandra_error(const string &context, string &&error) : runtime_error( + "Cassandra error: context=" + context + ", error=" + error) { + } + + cassandra_error(const string &context, CassError error) : runtime_error( + "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { } }; @@ -50,6 +55,122 @@ void assert_ok(const string &context, CassError &err) { throw cassandra_error(context, err); } +template +class cassandra_wrapper { +public: + inline + operator const Underlying *() const { + return underlying_; + } + + inline + Underlying *underlying() const { + return underlying_; + } + +protected: + cassandra_wrapper(Underlying *underlying) : underlying_(underlying) { + } + + cassandra_wrapper(cassandra_wrapper &&other) { + underlying_ = other.underlying_; + other.underlying_ = nullptr; + } + + cassandra_wrapper(const cassandra_wrapper &) = delete; + + cassandra_wrapper &operator=(const cassandra_wrapper &) = delete; + + virtual ~cassandra_wrapper() { + underlying_ = (Underlying *)0xaabbccdd; + } + + Underlying *underlying_; +}; + +class cassandra_result : public cassandra_wrapper { +public: + cassandra_result(const CassResult *result) : cassandra_wrapper(result) { + if (result == NULL) { + throw cassandra_error("cassandra_result()", "result is NULL"); + } + } + + cassandra_result(cassandra_result &&other) = default; + + cassandra_result(const cassandra_result &) = delete; + + ~cassandra_result() { + cass_result_free(underlying_); + } + + cassandra_result &operator=(const cassandra_result &) = delete; +}; + +static const CassResult *r; + +class cassandra_future; + +typedef std::function callback_type; + +struct tmp { + callback_type cb; +}; + +class cassandra_future : public cassandra_wrapper { +public: + cassandra_future(CassFuture *future) : cassandra_wrapper(future) { + } + + ~cassandra_future() { + cass_future_free(underlying_); + } + + cassandra_future(const cassandra_future &) = delete; + + cassandra_future &operator=(const cassandra_future &) = delete; + + cassandra_result result() { + if (fetched) { + throw cassandra_error("cassandra_result::result()", "Already fetched"); + } + fetched = true; + const CassResult *x = cass_future_get_result(underlying()); + size_t count = cass_result_row_count(x); + return std::move(cassandra_result(x)); + } + + string error_message() { + const char *message; + size_t message_length; + cass_future_error_message(underlying(), &message, &message_length); + return string(message, message_length); + } + + CassError error_code() const { + return cass_future_error_code(underlying()); + } + + bool operator!() const { + return error_code() != CASS_OK; + } + + bool ok() const { + return error_code() == CASS_OK; + } + + static void callback(CassFuture *f, void *data) { + tmp *tmp_ = static_cast(data); + auto cb = tmp_->cb; + delete tmp_; + cassandra_future c_f(f); + cb(c_f); + } + +private: + bool fetched = false; +}; + class cassandra_tuple { public: cassandra_tuple(size_t item_count) { @@ -60,6 +181,10 @@ public: cass_tuple_free(tuple); } + cassandra_tuple(const cassandra_tuple &) = delete; + + cassandra_tuple &operator=(const cassandra_tuple &) = delete; + void set(size_t i, cass_int32_t value) { cass_tuple_set_int32(tuple, i, value); } @@ -77,6 +202,10 @@ public: cass_collection_free(collection); } + cassandra_collection(const cassandra_collection &) = delete; + + cassandra_collection &operator=(const cassandra_collection &) = delete; + void append(const cassandra_tuple &&tuple) { cass_collection_append_tuple(collection, tuple.tuple); } @@ -103,7 +232,7 @@ void handle_future(CassFuture *future, std::function success CassError rc = cass_future_error_code(future); - if(rc == CASS_OK) { + if (rc == CASS_OK) { success(future); } else { error(future, rc); @@ -122,6 +251,10 @@ public: cass_statement_free(statement); } + cassandra_statement(const cassandra_statement &) = delete; + + cassandra_statement &operator=(const cassandra_statement &) = delete; + void bind(size_t i, const string &value) { auto err = cass_statement_bind_string(statement, i, value.c_str()); assert_ok("cass_statement_bind_string", err); @@ -154,11 +287,19 @@ public: c.append(value); } - std::cout << "values.size=" << values.size() << std::endl; auto err = cass_statement_bind_collection(statement, i, c); assert_ok("cass_statement_bind_collection", err); } + operator CassStatement *() const { + return statement; + } + + CassStatement *underlying() const { + return statement; + } + +private: CassStatement *statement; }; @@ -176,6 +317,21 @@ public: cassandra_session &operator=(const cassandra_session &) = delete; + void execute(cassandra_statement &&stmt, callback_type cb_) { + auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0); + auto future = cass_session_execute(session, stmt.underlying()); + cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_}); + } + + operator CassSession *() const { + return session; + } + + CassSession *underlying() const { + return session; + } + +private: CassSession *session; }; -- cgit v1.2.3