diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-13 01:16:34 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-13 01:16:34 +0200 |
commit | f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (patch) | |
tree | 100ba509749344d4dec02e3703b8cf4e283c6c63 /cassandra_support.h | |
parent | deb0c6cf01cb2b9994c77a6dd31341be8d1f1f4d (diff) | |
download | mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.gz mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.bz2 mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.xz mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.zip |
o Fully functional reception from MQTT into Cassandra.
Diffstat (limited to 'cassandra_support.h')
-rw-r--r-- | cassandra_support.h | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/cassandra_support.h b/cassandra_support.h new file mode 100644 index 0000000..167f69b --- /dev/null +++ b/cassandra_support.h @@ -0,0 +1,151 @@ +#ifndef TRYGVIS_CASSANDRA_SUPPORT_H +#define TRYGVIS_CASSANDRA_SUPPORT_H + +#include <stddef.h> +#include <string> +#include <cassandra.h> +#include <stdexcept> + +namespace trygvis { +namespace cassandra_support { + +using namespace std; + +class cassandra_error : runtime_error { +public: + cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) { + } +}; + +string error_message(CassFuture *future) { + const char *message; + size_t message_length; + cass_future_error_message(future, &message, &message_length); + return string(message, message_length); +} + +static CassError execute_query(CassSession *session, const string &&query) { + CassStatement *statement = cass_statement_new(query.c_str(), 0); + + CassFuture *future = cass_session_execute(session, statement); + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + cass_statement_free(statement); + + return rc; +} + +void assert_ok(const string &context, CassError &err) { + if (err == CASS_OK) { + return; + } + + throw cassandra_error(context, err); +} + +class cassandra_tuple { +public: + cassandra_tuple(size_t item_count) { + tuple = cass_tuple_new(item_count); + } + + ~cassandra_tuple() { + cass_tuple_free(tuple); + } + + void set(size_t i, cass_int32_t value) { + cass_tuple_set_int32(tuple, i, value); + } + + CassTuple *tuple; +}; + +class cassandra_collection { +public: + cassandra_collection(CassCollectionType type, size_t item_count) { + collection = cass_collection_new(type, item_count); + } + + ~cassandra_collection() { + cass_collection_free(collection); + } + + void append_tuple(cassandra_tuple &&tuple) { + cass_collection_append_tuple(collection, tuple.tuple); + } + + CassCollection *collection; +}; + +CassError wait_for_future(CassFuture *future) { + cass_future_wait(future); + + CassError rc = cass_future_error_code(future); + + cass_future_free(future); + + return rc; +}; + +class cassandra_statement { +public: + cassandra_statement(string q, size_t argument_count) { + statement = cass_statement_new(q.c_str(), argument_count); + }; + + ~cassandra_statement() { + cass_statement_free(statement); + } + + void bind(size_t i, const string &value) { + auto err = cass_statement_bind_string(statement, i, value.c_str()); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const char *value) { + auto err = cass_statement_bind_string(statement, i, value); + assert_ok("cass_statement_bind_string", err); + } + + void bind(size_t i, const cass_int64_t value) { + auto err = cass_statement_bind_int64(statement, i, value); + assert_ok("cass_statement_bind_int64", err); + } + + void bind(size_t i, const CassCollection *value) { + auto err = cass_statement_bind_collection(statement, i, value); + assert_ok("cass_statement_bind_collection", err); + } + + void bind(size_t i, const cassandra_collection &value) { + auto err = cass_statement_bind_collection(statement, i, value.collection); + assert_ok("cass_statement_bind_collection", err); + } + + CassStatement *statement; +}; + +class cassandra_session { +public: + cassandra_session() { + session = cass_session_new(); + } + + ~cassandra_session() { + cass_session_free(session); + } + + cassandra_session(const cassandra_session &) = delete; + + cassandra_session &operator=(const cassandra_session &) = delete; + + CassSession *session; +}; + +} +} + +#endif |