aboutsummaryrefslogtreecommitdiff
path: root/cassandra_support.h
diff options
context:
space:
mode:
Diffstat (limited to 'cassandra_support.h')
-rw-r--r--cassandra_support.h162
1 files changed, 159 insertions, 3 deletions
diff --git a/cassandra_support.h b/cassandra_support.h
index b0726c8..7b50296 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -17,7 +17,12 @@ using namespace std;
class cassandra_error : public runtime_error {
public:
- cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
+ cassandra_error(const string &context, string &&error) : runtime_error(
+ "Cassandra error: context=" + context + ", error=" + error) {
+ }
+
+ cassandra_error(const string &context, CassError error) : runtime_error(
+ "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
}
};
@@ -50,6 +55,122 @@ void assert_ok(const string &context, CassError &err) {
throw cassandra_error(context, err);
}
+template<typename Underlying>
+class cassandra_wrapper {
+public:
+ inline
+ operator const Underlying *() const {
+ return underlying_;
+ }
+
+ inline
+ Underlying *underlying() const {
+ return underlying_;
+ }
+
+protected:
+ cassandra_wrapper(Underlying *underlying) : underlying_(underlying) {
+ }
+
+ cassandra_wrapper(cassandra_wrapper &&other) {
+ underlying_ = other.underlying_;
+ other.underlying_ = nullptr;
+ }
+
+ cassandra_wrapper(const cassandra_wrapper &) = delete;
+
+ cassandra_wrapper &operator=(const cassandra_wrapper &) = delete;
+
+ virtual ~cassandra_wrapper() {
+ underlying_ = (Underlying *)0xaabbccdd;
+ }
+
+ Underlying *underlying_;
+};
+
+class cassandra_result : public cassandra_wrapper<const CassResult> {
+public:
+ cassandra_result(const CassResult *result) : cassandra_wrapper(result) {
+ if (result == NULL) {
+ throw cassandra_error("cassandra_result()", "result is NULL");
+ }
+ }
+
+ cassandra_result(cassandra_result &&other) = default;
+
+ cassandra_result(const cassandra_result &) = delete;
+
+ ~cassandra_result() {
+ cass_result_free(underlying_);
+ }
+
+ cassandra_result &operator=(const cassandra_result &) = delete;
+};
+
+static const CassResult *r;
+
+class cassandra_future;
+
+typedef std::function<void(cassandra_future &)> callback_type;
+
+struct tmp {
+ callback_type cb;
+};
+
+class cassandra_future : public cassandra_wrapper<CassFuture> {
+public:
+ cassandra_future(CassFuture *future) : cassandra_wrapper(future) {
+ }
+
+ ~cassandra_future() {
+ cass_future_free(underlying_);
+ }
+
+ cassandra_future(const cassandra_future &) = delete;
+
+ cassandra_future &operator=(const cassandra_future &) = delete;
+
+ cassandra_result result() {
+ if (fetched) {
+ throw cassandra_error("cassandra_result::result()", "Already fetched");
+ }
+ fetched = true;
+ const CassResult *x = cass_future_get_result(underlying());
+ size_t count = cass_result_row_count(x);
+ return std::move(cassandra_result(x));
+ }
+
+ string error_message() {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(underlying(), &message, &message_length);
+ return string(message, message_length);
+ }
+
+ CassError error_code() const {
+ return cass_future_error_code(underlying());
+ }
+
+ bool operator!() const {
+ return error_code() != CASS_OK;
+ }
+
+ bool ok() const {
+ return error_code() == CASS_OK;
+ }
+
+ static void callback(CassFuture *f, void *data) {
+ tmp *tmp_ = static_cast<tmp *>(data);
+ auto cb = tmp_->cb;
+ delete tmp_;
+ cassandra_future c_f(f);
+ cb(c_f);
+ }
+
+private:
+ bool fetched = false;
+};
+
class cassandra_tuple {
public:
cassandra_tuple(size_t item_count) {
@@ -60,6 +181,10 @@ public:
cass_tuple_free(tuple);
}
+ cassandra_tuple(const cassandra_tuple &) = delete;
+
+ cassandra_tuple &operator=(const cassandra_tuple &) = delete;
+
void set(size_t i, cass_int32_t value) {
cass_tuple_set_int32(tuple, i, value);
}
@@ -77,6 +202,10 @@ public:
cass_collection_free(collection);
}
+ cassandra_collection(const cassandra_collection &) = delete;
+
+ cassandra_collection &operator=(const cassandra_collection &) = delete;
+
void append(const cassandra_tuple &&tuple) {
cass_collection_append_tuple(collection, tuple.tuple);
}
@@ -103,7 +232,7 @@ void handle_future(CassFuture *future, std::function<void(CassFuture *)> success
CassError rc = cass_future_error_code(future);
- if(rc == CASS_OK) {
+ if (rc == CASS_OK) {
success(future);
} else {
error(future, rc);
@@ -122,6 +251,10 @@ public:
cass_statement_free(statement);
}
+ cassandra_statement(const cassandra_statement &) = delete;
+
+ cassandra_statement &operator=(const cassandra_statement &) = delete;
+
void bind(size_t i, const string &value) {
auto err = cass_statement_bind_string(statement, i, value.c_str());
assert_ok("cass_statement_bind_string", err);
@@ -154,11 +287,19 @@ public:
c.append(value);
}
- std::cout << "values.size=" << values.size() << std::endl;
auto err = cass_statement_bind_collection(statement, i, c);
assert_ok("cass_statement_bind_collection", err);
}
+ operator CassStatement *() const {
+ return statement;
+ }
+
+ CassStatement *underlying() const {
+ return statement;
+ }
+
+private:
CassStatement *statement;
};
@@ -176,6 +317,21 @@ public:
cassandra_session &operator=(const cassandra_session &) = delete;
+ void execute(cassandra_statement &&stmt, callback_type cb_) {
+ auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0);
+ auto future = cass_session_execute(session, stmt.underlying());
+ cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_});
+ }
+
+ operator CassSession *() const {
+ return session;
+ }
+
+ CassSession *underlying() const {
+ return session;
+ }
+
+private:
CassSession *session;
};