#ifndef TRYGVIS_CASSANDRA_SUPPORT_H #define TRYGVIS_CASSANDRA_SUPPORT_H #include <cassandra.h> #include <string> #include <cassert> #include <algorithm> #include <stdexcept> #include <functional> #include <vector> #include <iostream> #include <sstream> #include <iomanip> #include <mutex> namespace trygvis { namespace cassandra_support { using namespace std; class cassandra_error : public runtime_error { public: cassandra_error(const string &context, const string &error) : runtime_error( "Cassandra error: context=" + context + ", error=" + error) { } cassandra_error(const string &context, CassError error) : runtime_error( "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { } }; static CassError execute_query(CassSession *session, const string query) { CassStatement *statement = cass_statement_new(query.c_str(), 0); CassFuture *future = cass_session_execute(session, statement); cass_future_wait(future); CassError rc = cass_future_error_code(future); cass_future_free(future); cass_statement_free(statement); return rc; } void assert_ok(const string &context, CassError &err) { if (err == CASS_OK) { return; } throw cassandra_error(context, err); } template<typename Underlying> class cassandra_wrapper { public: inline operator const Underlying *() const { return underlying_; } inline Underlying *underlying() const { return underlying_; } protected: cassandra_wrapper(Underlying *underlying) : underlying_(underlying) { } cassandra_wrapper(cassandra_wrapper &&other) { underlying_ = other.underlying_; other.underlying_ = nullptr; } cassandra_wrapper(const cassandra_wrapper &) = delete; cassandra_wrapper &operator=(const cassandra_wrapper &) = delete; virtual ~cassandra_wrapper() { underlying_ = reinterpret_cast<Underlying *>(0xaabbccdd); } Underlying *underlying_; }; class cassandra_result : public cassandra_wrapper<const CassResult> { public: cassandra_result(const CassResult *result) : cassandra_wrapper(result) { if (result == NULL) { throw cassandra_error("cassandra_result()", "result is NULL"); } } cassandra_result(cassandra_result &&other) = default; cassandra_result(const cassandra_result &) = delete; ~cassandra_result() { cass_result_free(underlying_); } cassandra_result &operator=(const cassandra_result &) = delete; }; class cassandra_future2 { public: // TODO: the values shouldn't be the future, but a specific result object instead typedef void(callback_t)(cassandra_future2 &); static cassandra_future2 *wrap(CassFuture *future) { return new cassandra_future2(future); } private: typedef std::lock_guard<std::mutex> guard; callback_t *callback_ = nullptr; CassFuture *future; bool has_callback = false; bool has_data = false; std::mutex mutex; cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) { cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this << ", future=" << std::hex << std::setw(8) << future << endl; cass_future_set_callback(future, callback, this); } static void default_callback(cassandra_future2 &) { cout << "default_callback" << endl; } ~cassandra_future2() { cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl; // cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl; // cass_future_free(underlying_); } cassandra_future2(const cassandra_future2 &) = delete; cassandra_future2 &operator=(const cassandra_future2 &) = delete; public: void then(callback_t callback_) { bool do_delete = false; { guard lock(mutex); assert(callback_ != nullptr); this->callback_ = callback_; has_callback = true; if (has_data) { cout << "Had early data" << endl; this->callback(future); cout << "freeing future: " << std::hex << std::setw(8) << future << endl; cass_future_free(future); do_delete = true; } } if (do_delete) { delete this; } } bool fetched = false; cassandra_result result() { if (fetched) { throw cassandra_error("cassandra_result::result()", "Already fetched"); } fetched = true; const CassResult *x = cass_future_get_result(future); size_t count = cass_result_row_count(x); return std::move(cassandra_result(x)); } static string error_message(CassFuture * future) { const char *message; size_t message_length; cass_future_error_message(future, &message, &message_length); return string(message, message_length); } string error_message() { return error_message(future); } CassError error_code() const { return cass_future_error_code(future); } bool operator!() const { return !ok(); } operator bool() const { return ok(); } bool ok() const { return error_code() == CASS_OK; } private: static void callback(CassFuture *f, void *data) { cassandra_future2 *c_f = static_cast<cassandra_future2 *>(data); c_f->callback(f); } void callback(CassFuture *future) { bool do_delete = false; { guard lock(mutex); cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl; cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl; CassError rc = cass_future_error_code(future); has_data = true; if (has_callback) { cout << "had callback already" << endl; callback_(*this); cout << "freeing future: " << std::hex << std::setw(8) << future << endl; cass_future_free(future); do_delete = true; } } if (do_delete) { delete this; } } }; class cassandra_future : public cassandra_wrapper<CassFuture> { public: typedef std::function<void(cassandra_future &)> callback_type; struct tmp { cassandra_future::callback_type cb; }; cassandra_future(CassFuture *future) : cassandra_wrapper(future) { } ~cassandra_future() { cass_future_free(underlying_); } cassandra_future(const cassandra_future &) = delete; cassandra_future &operator=(const cassandra_future &) = delete; cassandra_result result() { if (fetched) { throw cassandra_error("cassandra_result::result()", "Already fetched"); } fetched = true; const CassResult *x = cass_future_get_result(underlying()); size_t count = cass_result_row_count(x); return std::move(cassandra_result(x)); } static string error_message(CassFuture *future) { return cassandra_future2::error_message(future); } string error_message() { return error_message(underlying()); } CassError error_code() const { return cass_future_error_code(underlying()); } bool operator!() const { return error_code() != CASS_OK; } bool ok() const { return error_code() == CASS_OK; } static void callback(CassFuture *f, void *data) { tmp *tmp_ = static_cast<tmp *>(data); auto cb = tmp_->cb; delete tmp_; cassandra_future c_f(f); cb(c_f); } private: bool fetched = false; }; class cassandra_tuple { public: cassandra_tuple(size_t item_count) { tuple = cass_tuple_new(item_count); } ~cassandra_tuple() { cass_tuple_free(tuple); } cassandra_tuple(const cassandra_tuple &) = delete; cassandra_tuple &operator=(const cassandra_tuple &) = delete; void set(size_t i, const cass_int32_t value) { cass_tuple_set_int32(tuple, i, value); } void set(size_t i, const string &s) { cass_tuple_set_string_n(tuple, i, s.c_str(), s.length()); } CassTuple *tuple; }; class cassandra_collection { public: cassandra_collection(CassCollectionType type, size_t item_count) { collection = cass_collection_new(type, item_count); } ~cassandra_collection() { cass_collection_free(collection); } cassandra_collection(const cassandra_collection &) = delete; cassandra_collection &operator=(const cassandra_collection &) = delete; void append(const cassandra_tuple &&tuple) { cass_collection_append_tuple(collection, tuple.tuple); } void append(const string &&value) { cass_collection_append_string(collection, value.c_str()); } void append(const string &value) { cass_collection_append_string(collection, value.c_str()); } operator CassCollection *() const { return collection; }; private: CassCollection *collection; }; void handle_future(CassFuture *future, std::function<void(CassFuture *)> success, std::function<void(CassFuture *, CassError)> error) { cass_future_wait(future); CassError rc = cass_future_error_code(future); if (rc == CASS_OK) { success(future); } else { error(future, rc); } cass_future_free(future); }; class cassandra_statement { public: cassandra_statement(string q, size_t argument_count) { statement = cass_statement_new(q.c_str(), argument_count); }; ~cassandra_statement() { cass_statement_free(statement); } cassandra_statement(const cassandra_statement &) = delete; cassandra_statement &operator=(const cassandra_statement &) = delete; void bind(size_t i, const string &value) { auto err = cass_statement_bind_string(statement, i, value.c_str()); assert_ok("cass_statement_bind_string", err); } void bind(size_t i, const char *value) { auto err = cass_statement_bind_string(statement, i, value); assert_ok("cass_statement_bind_string", err); } void bind(size_t i, const cass_int64_t value) { auto err = cass_statement_bind_int64(statement, i, value); assert_ok("cass_statement_bind_int64", err); } void bind(size_t i, const CassCollection *value) { auto err = cass_statement_bind_collection(statement, i, value); assert_ok("cass_statement_bind_collection", err); } void bind(size_t i, const cassandra_collection &value) { auto err = cass_statement_bind_collection(statement, i, value); assert_ok("cass_statement_bind_collection", err); } void bind(size_t i, const std::vector<string> &&values) { cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size()); for (const auto &value : values) { c.append(value); } auto err = cass_statement_bind_collection(statement, i, c); assert_ok("cass_statement_bind_collection", err); } operator CassStatement *() const { return statement; } CassStatement *underlying() const { return statement; } private: CassStatement *statement; }; class cassandra_session { public: cassandra_session() { session = cass_session_new(); } ~cassandra_session() { cass_session_free(session); } cassandra_session(const cassandra_session &) = delete; cassandra_session &operator=(const cassandra_session &) = delete; auto connect(CassCluster *cluster) { return cass_session_connect(session, cluster); } [[deprecated]] void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) { auto future = cass_session_execute(session, stmt.underlying()); cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_}); } cassandra_future2 *execute2(cassandra_statement &&stmt) { auto future = cass_session_execute(session, stmt.underlying()); return cassandra_future2::wrap(future); } operator CassSession *() const { return session; } CassSession *underlying() const { return session; } private: CassSession *session; }; class cassandra_logging { public: cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) { cass_log_set_level(log_level); cass_log_set_callback(on_log, this); } ~cassandra_logging() { } private: static void on_log(const CassLogMessage *message, void *data) { stringstream buf; buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " << message->file << ":" << message->function << ":" << message->line << ":" << message->message; cout << "CASSANDRA: " << buf.str() << endl; } }; } } #endif