aboutsummaryrefslogtreecommitdiff
path: root/cassandra_support.h
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-13 01:16:34 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-13 01:16:34 +0200
commitf2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (patch)
tree100ba509749344d4dec02e3703b8cf4e283c6c63 /cassandra_support.h
parentdeb0c6cf01cb2b9994c77a6dd31341be8d1f1f4d (diff)
downloadmqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.gz
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.bz2
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.tar.xz
mqtt-cassandra-bridge-f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17.zip
o Fully functional reception from MQTT into Cassandra.
Diffstat (limited to 'cassandra_support.h')
-rw-r--r--cassandra_support.h151
1 files changed, 151 insertions, 0 deletions
diff --git a/cassandra_support.h b/cassandra_support.h
new file mode 100644
index 0000000..167f69b
--- /dev/null
+++ b/cassandra_support.h
@@ -0,0 +1,151 @@
+#ifndef TRYGVIS_CASSANDRA_SUPPORT_H
+#define TRYGVIS_CASSANDRA_SUPPORT_H
+
+#include <stddef.h>
+#include <string>
+#include <cassandra.h>
+#include <stdexcept>
+
+namespace trygvis {
+namespace cassandra_support {
+
+using namespace std;
+
+class cassandra_error : runtime_error {
+public:
+ cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) {
+ }
+};
+
+string error_message(CassFuture *future) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(future, &message, &message_length);
+ return string(message, message_length);
+}
+
+static CassError execute_query(CassSession *session, const string &&query) {
+ CassStatement *statement = cass_statement_new(query.c_str(), 0);
+
+ CassFuture *future = cass_session_execute(session, statement);
+ cass_future_wait(future);
+
+ CassError rc = cass_future_error_code(future);
+
+ cass_future_free(future);
+ cass_statement_free(statement);
+
+ return rc;
+}
+
+void assert_ok(const string &context, CassError &err) {
+ if (err == CASS_OK) {
+ return;
+ }
+
+ throw cassandra_error(context, err);
+}
+
+class cassandra_tuple {
+public:
+ cassandra_tuple(size_t item_count) {
+ tuple = cass_tuple_new(item_count);
+ }
+
+ ~cassandra_tuple() {
+ cass_tuple_free(tuple);
+ }
+
+ void set(size_t i, cass_int32_t value) {
+ cass_tuple_set_int32(tuple, i, value);
+ }
+
+ CassTuple *tuple;
+};
+
+class cassandra_collection {
+public:
+ cassandra_collection(CassCollectionType type, size_t item_count) {
+ collection = cass_collection_new(type, item_count);
+ }
+
+ ~cassandra_collection() {
+ cass_collection_free(collection);
+ }
+
+ void append_tuple(cassandra_tuple &&tuple) {
+ cass_collection_append_tuple(collection, tuple.tuple);
+ }
+
+ CassCollection *collection;
+};
+
+CassError wait_for_future(CassFuture *future) {
+ cass_future_wait(future);
+
+ CassError rc = cass_future_error_code(future);
+
+ cass_future_free(future);
+
+ return rc;
+};
+
+class cassandra_statement {
+public:
+ cassandra_statement(string q, size_t argument_count) {
+ statement = cass_statement_new(q.c_str(), argument_count);
+ };
+
+ ~cassandra_statement() {
+ cass_statement_free(statement);
+ }
+
+ void bind(size_t i, const string &value) {
+ auto err = cass_statement_bind_string(statement, i, value.c_str());
+ assert_ok("cass_statement_bind_string", err);
+ }
+
+ void bind(size_t i, const char *value) {
+ auto err = cass_statement_bind_string(statement, i, value);
+ assert_ok("cass_statement_bind_string", err);
+ }
+
+ void bind(size_t i, const cass_int64_t value) {
+ auto err = cass_statement_bind_int64(statement, i, value);
+ assert_ok("cass_statement_bind_int64", err);
+ }
+
+ void bind(size_t i, const CassCollection *value) {
+ auto err = cass_statement_bind_collection(statement, i, value);
+ assert_ok("cass_statement_bind_collection", err);
+ }
+
+ void bind(size_t i, const cassandra_collection &value) {
+ auto err = cass_statement_bind_collection(statement, i, value.collection);
+ assert_ok("cass_statement_bind_collection", err);
+ }
+
+ CassStatement *statement;
+};
+
+class cassandra_session {
+public:
+ cassandra_session() {
+ session = cass_session_new();
+ }
+
+ ~cassandra_session() {
+ cass_session_free(session);
+ }
+
+ cassandra_session(const cassandra_session &) = delete;
+
+ cassandra_session &operator=(const cassandra_session &) = delete;
+
+ CassSession *session;
+};
+
+}
+}
+
+#endif