aboutsummaryrefslogtreecommitdiff
path: root/cassandra_support.h
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-08-02 16:27:10 +0200
commit06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42 (patch)
treebc272faec6c585693341a814c65483e86c56255b /cassandra_support.h
parentb632036b153297f83b10f6d960ccfe0c1772f00e (diff)
downloadmqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.gz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.bz2
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.tar.xz
mqtt-cassandra-bridge-06c78fe0e2e4e7f0a5ba791571e1986a2e0dab42.zip
o Adding a MQTT consumer that stores the parsed record in Cassandra.
Diffstat (limited to 'cassandra_support.h')
-rw-r--r--cassandra_support.h221
1 files changed, 196 insertions, 25 deletions
diff --git a/cassandra_support.h b/cassandra_support.h
index 7b50296..3f49be2 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -1,14 +1,18 @@
#ifndef TRYGVIS_CASSANDRA_SUPPORT_H
#define TRYGVIS_CASSANDRA_SUPPORT_H
-#include <stddef.h>
+#include <cassandra.h>
+
#include <string>
+#include <cassert>
#include <algorithm>
-#include <cassandra.h>
#include <stdexcept>
#include <functional>
#include <vector>
#include <iostream>
+#include <sstream>
+#include <iomanip>
+#include <mutex>
namespace trygvis {
namespace cassandra_support {
@@ -17,7 +21,7 @@ using namespace std;
class cassandra_error : public runtime_error {
public:
- cassandra_error(const string &context, string &&error) : runtime_error(
+ cassandra_error(const string &context, const string &error) : runtime_error(
"Cassandra error: context=" + context + ", error=" + error) {
}
@@ -26,14 +30,7 @@ public:
}
};
-string error_message(CassFuture *future) {
- const char *message;
- size_t message_length;
- cass_future_error_message(future, &message, &message_length);
- return string(message, message_length);
-}
-
-static CassError execute_query(CassSession *session, const string &&query) {
+static CassError execute_query(CassSession *session, const string query) {
CassStatement *statement = cass_statement_new(query.c_str(), 0);
CassFuture *future = cass_session_execute(session, statement);
@@ -82,7 +79,7 @@ protected:
cassandra_wrapper &operator=(const cassandra_wrapper &) = delete;
virtual ~cassandra_wrapper() {
- underlying_ = (Underlying *)0xaabbccdd;
+ underlying_ = reinterpret_cast<Underlying *>(0xaabbccdd);
}
Underlying *underlying_;
@@ -107,18 +104,155 @@ public:
cassandra_result &operator=(const cassandra_result &) = delete;
};
-static const CassResult *r;
+class cassandra_future2 {
+public:
+ // TODO: the values shouldn't be the future, but a specific result object instead
+ typedef void(callback_t)(cassandra_future2 &);
+
+ static
+ cassandra_future2 *wrap(CassFuture *future) {
+ return new cassandra_future2(future);
+ }
+
+private:
+ typedef std::lock_guard<std::mutex> guard;
+
+ callback_t *callback_ = nullptr;
+ CassFuture *future;
+ bool has_callback = false;
+ bool has_data = false;
+
+ std::mutex mutex;
+
+ cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) {
+ cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this <<
+ ", future=" << std::hex << std::setw(8) << future << endl;
+ cass_future_set_callback(future, callback, this);
+ }
+
+ static void default_callback(cassandra_future2 &) {
+ cout << "default_callback" << endl;
+ }
+
+ ~cassandra_future2() {
+ cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl;
+// cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl;
+// cass_future_free(underlying_);
+ }
+
+ cassandra_future2(const cassandra_future2 &) = delete;
+
+ cassandra_future2 &operator=(const cassandra_future2 &) = delete;
+
+public:
+ void then(callback_t callback_) {
+ bool do_delete = false;
+
+ {
+ guard lock(mutex);
+
+ assert(callback_ != nullptr);
+ this->callback_ = callback_;
+ has_callback = true;
+
+ if (has_data) {
+ cout << "Had early data" << endl;
+ this->callback(future);
+
+ cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
+ cass_future_free(future);
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
+
+ bool fetched = false;
+
+ cassandra_result result() {
+ if (fetched) {
+ throw cassandra_error("cassandra_result::result()", "Already fetched");
+ }
+ fetched = true;
+ const CassResult *x = cass_future_get_result(future);
+ size_t count = cass_result_row_count(x);
+ return std::move(cassandra_result(x));
+ }
+
+ static
+ string error_message(CassFuture * future) {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(future, &message, &message_length);
+ return string(message, message_length);
+ }
+
+ string error_message() {
+ return error_message(future);
+ }
+
+ CassError error_code() const {
+ return cass_future_error_code(future);
+ }
-class cassandra_future;
+ bool operator!() const {
+ return !ok();
+ }
-typedef std::function<void(cassandra_future &)> callback_type;
+ operator bool() const {
+ return ok();
+ }
-struct tmp {
- callback_type cb;
+ bool ok() const {
+ return error_code() == CASS_OK;
+ }
+
+private:
+
+ static void callback(CassFuture *f, void *data) {
+ cassandra_future2 *c_f = static_cast<cassandra_future2 *>(data);
+ c_f->callback(f);
+ }
+
+ void callback(CassFuture *future) {
+ bool do_delete = false;
+
+ {
+ guard lock(mutex);
+
+ cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl;
+ cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl;
+ CassError rc = cass_future_error_code(future);
+
+ has_data = true;
+ if (has_callback) {
+ cout << "had callback already" << endl;
+ callback_(*this);
+
+ cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
+ cass_future_free(future);
+
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
};
class cassandra_future : public cassandra_wrapper<CassFuture> {
public:
+ typedef std::function<void(cassandra_future &)> callback_type;
+
+ struct tmp {
+ cassandra_future::callback_type cb;
+ };
+
cassandra_future(CassFuture *future) : cassandra_wrapper(future) {
}
@@ -140,11 +274,13 @@ public:
return std::move(cassandra_result(x));
}
+ static
+ string error_message(CassFuture *future) {
+ return cassandra_future2::error_message(future);
+ }
+
string error_message() {
- const char *message;
- size_t message_length;
- cass_future_error_message(underlying(), &message, &message_length);
- return string(message, message_length);
+ return error_message(underlying());
}
CassError error_code() const {
@@ -185,10 +321,14 @@ public:
cassandra_tuple &operator=(const cassandra_tuple &) = delete;
- void set(size_t i, cass_int32_t value) {
+ void set(size_t i, const cass_int32_t value) {
cass_tuple_set_int32(tuple, i, value);
}
+ void set(size_t i, const string &s) {
+ cass_tuple_set_string_n(tuple, i, s.c_str(), s.length());
+ }
+
CassTuple *tuple;
};
@@ -317,10 +457,19 @@ public:
cassandra_session &operator=(const cassandra_session &) = delete;
- void execute(cassandra_statement &&stmt, callback_type cb_) {
- auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0);
+ auto connect(CassCluster *cluster) {
+ return cass_session_connect(session, cluster);
+ }
+
+ [[deprecated]]
+ void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) {
+ auto future = cass_session_execute(session, stmt.underlying());
+ cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_});
+ }
+
+ cassandra_future2 *execute2(cassandra_statement &&stmt) {
auto future = cass_session_execute(session, stmt.underlying());
- cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_});
+ return cassandra_future2::wrap(future);
}
operator CassSession *() const {
@@ -335,6 +484,28 @@ private:
CassSession *session;
};
+class cassandra_logging {
+public:
+ cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) {
+ cass_log_set_level(log_level);
+ cass_log_set_callback(on_log, this);
+ }
+
+ ~cassandra_logging() {
+ }
+
+private:
+ static
+ void on_log(const CassLogMessage *message, void *data) {
+ stringstream buf;
+ buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " <<
+ message->file << ":" << message->function << ":" <<
+ message->line << ":" << message->message;
+
+ cout << "CASSANDRA: " << buf.str() << endl;
+ }
+};
+
}
}