From f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 13 Jul 2015 01:16:34 +0200 Subject: o Fully functional reception from MQTT into Cassandra. --- cassandra_support.h | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 cassandra_support.h (limited to 'cassandra_support.h') diff --git a/cassandra_support.h b/cassandra_support.h new file mode 100644 index 0000000..167f69b --- /dev/null +++ b/cassandra_support.h @@ -0,0 +1,151 @@ +#ifndef TRYGVIS_CASSANDRA_SUPPORT_H +#define TRYGVIS_CASSANDRA_SUPPORT_H + +#include +#include +#include +#include + +namespace trygvis { +namespace cassandra_support { + +using namespace std; + +class cassandra_error : runtime_error { +public: + cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) { + } +}; + +string error_message(CassFuture *future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + return string(message, message_length); +} + +static CassError execute_query(CassSession *session, const string &&query) { + CassStatement *statement = cass_statement_new(query.c_str(), 0); + + CassFuture *future = cass_session_execute(session, statement); + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + cass_statement_free(statement); + + return rc; +} + +void assert_ok(const string &context, CassError &err) { + if (err == CASS_OK) { + return; + } + + throw cassandra_error(context, err); +} + +class cassandra_tuple { +public: + cassandra_tuple(size_t item_count) { + tuple = cass_tuple_new(item_count); + } + + ~cassandra_tuple() { + cass_tuple_free(tuple); + } + + void set(size_t i, cass_int32_t value) { + cass_tuple_set_int32(tuple, i, value); + } + + CassTuple *tuple; +}; + +class cassandra_collection { +public: + cassandra_collection(CassCollectionType type, size_t item_count) { + collection = cass_collection_new(type, item_count); + } + + ~cassandra_collection() { + cass_collection_free(collection); + } + + void append_tuple(cassandra_tuple &&tuple) { + cass_collection_append_tuple(collection, tuple.tuple); + } + + CassCollection *collection; +}; + +CassError wait_for_future(CassFuture *future) { + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + + return rc; +}; + +class cassandra_statement { +public: + cassandra_statement(string q, size_t argument_count) { + statement = cass_statement_new(q.c_str(), argument_count); + }; + + ~cassandra_statement() { + cass_statement_free(statement); + } + + void bind(size_t i, const string &value) { + auto err = cass_statement_bind_string(statement, i, value.c_str()); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const char *value) { + auto err = cass_statement_bind_string(statement, i, value); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const cass_int64_t value) { + auto err = cass_statement_bind_int64(statement, i, value); + assert_ok("cass_statement_bind_int64", err); + } + + void bind(size_t i, const CassCollection *value) { + auto err = cass_statement_bind_collection(statement, i, value); + assert_ok("cass_statement_bind_collection", err); + } + + void bind(size_t i, const cassandra_collection &value) { + auto err = cass_statement_bind_collection(statement, i, value.collection); + assert_ok("cass_statement_bind_collection", err); + } + + CassStatement *statement; +}; + +class cassandra_session { +public: + cassandra_session() { + session = cass_session_new(); + } + + ~cassandra_session() { + cass_session_free(session); + } + + cassandra_session(const cassandra_session &) = delete; + + cassandra_session &operator=(const cassandra_session &) = delete; + + CassSession *session; +}; + +} +} + +#endif -- cgit v1.2.3