aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-07-16 13:43:13 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-07-16 13:43:13 +0200
commitb632036b153297f83b10f6d960ccfe0c1772f00e (patch)
treefd56c04b7b08da0be010a5ce72162a3a474adf29
parentd77ebb924c1eeca345bbb3f1eeb2df3058a52a18 (diff)
downloadmqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.gz
mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.bz2
mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.xz
mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.zip
o More Cassandra wrappers.
o Using more futures.
-rw-r--r--CMakeLists.txt2
-rw-r--r--cassandra_support.h162
-rw-r--r--sm-http-server.cpp82
-rw-r--r--sm-mqtt-consumer.cpp14
4 files changed, 211 insertions, 49 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 11ede97..75645c0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -34,7 +34,7 @@ ExternalProject_Add(nghttp2
PREFIX ${NGHTTP2}
BUILD_ALWAYS 0
CONFIGURE_COMMAND cd <SOURCE_DIR> && autoreconf -i && automake && autoconf && ./configure --prefix=<INSTALL_DIR> --enable-asio-lib
- BUILD_COMMAND cd <SOURCE_DIR> && make
+ BUILD_COMMAND cd <SOURCE_DIR> && "$(MAKE)"
INSTALL_COMMAND cd <SOURCE_DIR> && make install
)
diff --git a/cassandra_support.h b/cassandra_support.h
index b0726c8..7b50296 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -17,7 +17,12 @@ using namespace std;
class cassandra_error : public runtime_error {
public:
- cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
+ cassandra_error(const string &context, string &&error) : runtime_error(
+ "Cassandra error: context=" + context + ", error=" + error) {
+ }
+
+ cassandra_error(const string &context, CassError error) : runtime_error(
+ "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
}
};
@@ -50,6 +55,122 @@ void assert_ok(const string &context, CassError &err) {
throw cassandra_error(context, err);
}
+template<typename Underlying>
+class cassandra_wrapper {
+public:
+ inline
+ operator const Underlying *() const {
+ return underlying_;
+ }
+
+ inline
+ Underlying *underlying() const {
+ return underlying_;
+ }
+
+protected:
+ cassandra_wrapper(Underlying *underlying) : underlying_(underlying) {
+ }
+
+ cassandra_wrapper(cassandra_wrapper &&other) {
+ underlying_ = other.underlying_;
+ other.underlying_ = nullptr;
+ }
+
+ cassandra_wrapper(const cassandra_wrapper &) = delete;
+
+ cassandra_wrapper &operator=(const cassandra_wrapper &) = delete;
+
+ virtual ~cassandra_wrapper() {
+ underlying_ = (Underlying *)0xaabbccdd;
+ }
+
+ Underlying *underlying_;
+};
+
+class cassandra_result : public cassandra_wrapper<const CassResult> {
+public:
+ cassandra_result(const CassResult *result) : cassandra_wrapper(result) {
+ if (result == NULL) {
+ throw cassandra_error("cassandra_result()", "result is NULL");
+ }
+ }
+
+ cassandra_result(cassandra_result &&other) = default;
+
+ cassandra_result(const cassandra_result &) = delete;
+
+ ~cassandra_result() {
+ cass_result_free(underlying_);
+ }
+
+ cassandra_result &operator=(const cassandra_result &) = delete;
+};
+
+static const CassResult *r;
+
+class cassandra_future;
+
+typedef std::function<void(cassandra_future &)> callback_type;
+
+struct tmp {
+ callback_type cb;
+};
+
+class cassandra_future : public cassandra_wrapper<CassFuture> {
+public:
+ cassandra_future(CassFuture *future) : cassandra_wrapper(future) {
+ }
+
+ ~cassandra_future() {
+ cass_future_free(underlying_);
+ }
+
+ cassandra_future(const cassandra_future &) = delete;
+
+ cassandra_future &operator=(const cassandra_future &) = delete;
+
+ cassandra_result result() {
+ if (fetched) {
+ throw cassandra_error("cassandra_result::result()", "Already fetched");
+ }
+ fetched = true;
+ const CassResult *x = cass_future_get_result(underlying());
+ size_t count = cass_result_row_count(x);
+ return std::move(cassandra_result(x));
+ }
+
+ string error_message() {
+ const char *message;
+ size_t message_length;
+ cass_future_error_message(underlying(), &message, &message_length);
+ return string(message, message_length);
+ }
+
+ CassError error_code() const {
+ return cass_future_error_code(underlying());
+ }
+
+ bool operator!() const {
+ return error_code() != CASS_OK;
+ }
+
+ bool ok() const {
+ return error_code() == CASS_OK;
+ }
+
+ static void callback(CassFuture *f, void *data) {
+ tmp *tmp_ = static_cast<tmp *>(data);
+ auto cb = tmp_->cb;
+ delete tmp_;
+ cassandra_future c_f(f);
+ cb(c_f);
+ }
+
+private:
+ bool fetched = false;
+};
+
class cassandra_tuple {
public:
cassandra_tuple(size_t item_count) {
@@ -60,6 +181,10 @@ public:
cass_tuple_free(tuple);
}
+ cassandra_tuple(const cassandra_tuple &) = delete;
+
+ cassandra_tuple &operator=(const cassandra_tuple &) = delete;
+
void set(size_t i, cass_int32_t value) {
cass_tuple_set_int32(tuple, i, value);
}
@@ -77,6 +202,10 @@ public:
cass_collection_free(collection);
}
+ cassandra_collection(const cassandra_collection &) = delete;
+
+ cassandra_collection &operator=(const cassandra_collection &) = delete;
+
void append(const cassandra_tuple &&tuple) {
cass_collection_append_tuple(collection, tuple.tuple);
}
@@ -103,7 +232,7 @@ void handle_future(CassFuture *future, std::function<void(CassFuture *)> success
CassError rc = cass_future_error_code(future);
- if(rc == CASS_OK) {
+ if (rc == CASS_OK) {
success(future);
} else {
error(future, rc);
@@ -122,6 +251,10 @@ public:
cass_statement_free(statement);
}
+ cassandra_statement(const cassandra_statement &) = delete;
+
+ cassandra_statement &operator=(const cassandra_statement &) = delete;
+
void bind(size_t i, const string &value) {
auto err = cass_statement_bind_string(statement, i, value.c_str());
assert_ok("cass_statement_bind_string", err);
@@ -154,11 +287,19 @@ public:
c.append(value);
}
- std::cout << "values.size=" << values.size() << std::endl;
auto err = cass_statement_bind_collection(statement, i, c);
assert_ok("cass_statement_bind_collection", err);
}
+ operator CassStatement *() const {
+ return statement;
+ }
+
+ CassStatement *underlying() const {
+ return statement;
+ }
+
+private:
CassStatement *statement;
};
@@ -176,6 +317,21 @@ public:
cassandra_session &operator=(const cassandra_session &) = delete;
+ void execute(cassandra_statement &&stmt, callback_type cb_) {
+ auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0);
+ auto future = cass_session_execute(session, stmt.underlying());
+ cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_});
+ }
+
+ operator CassSession *() const {
+ return session;
+ }
+
+ CassSession *underlying() const {
+ return session;
+ }
+
+private:
CassSession *session;
};
diff --git a/sm-http-server.cpp b/sm-http-server.cpp
index 7ac4e67..b1cd37e 100644
--- a/sm-http-server.cpp
+++ b/sm-http-server.cpp
@@ -1,11 +1,6 @@
#include "cassandra_support.h"
#include "http_support.h"
-#include <nghttp2/asio_http2_server.h>
-#include <iostream>
-#include <string>
#include <boost/program_options.hpp>
-#include <boost/algorithm/string.hpp>
-#include <boost/algorithm/string/split.hpp>
#include <cxxabi.h>
using namespace std;
@@ -53,7 +48,7 @@ cass_int32_t read_value_int32(const CassRow *row, const size_t index) {
}
void handle_device_get(const request &req, const response &res, string device) {
- if(!current_cassandra_session) {
+ if (!current_cassandra_session) {
header_map headers;
headers.emplace("content-type", text_plain);
res.write_head(503, headers);
@@ -65,19 +60,30 @@ void handle_device_get(const request &req, const response &res, string device) {
cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2);
stmt.bind(0, device);
- vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"};
+ vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15",
+ "2015-07-16"};
stmt.bind(1, std::move(days));
- auto f = cass_session_execute(current_cassandra_session->session, stmt.statement);
- handle_future(f, [&](auto future) {
+ current_cassandra_session->execute(std::move(stmt), [&](cassandra_future& future) {
+ const cassandra_result result = future.result();
+ auto x = result.underlying();
+ if (!future.ok()) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(500, headers);
+
+ stringstream buf;
+ buf << "Bad shit: " << future.error_message() << "\r\n";
+ res.end(buf.str());
+ return;
+ }
+
header_map headers;
headers.emplace("content-type", application_json);
res.write_head(200, headers);
- const CassResult *result = cass_future_get_result(future);
- size_t count = cass_result_row_count(result);
- cout << "row count: " << count << endl;
- CassIterator *rows = cass_iterator_from_result(result);
+ size_t count = cass_result_row_count(x);
+ CassIterator *rows = cass_iterator_from_result(x);
stringstream buf;
buf << "[";
@@ -88,8 +94,7 @@ void handle_device_get(const request &req, const response &res, string device) {
string d = read_string(row, 0);
auto timestamp = read_value_int64(row, 1);
-// auto sensors = read_string(row, 1);
- int value = -1;
+// auto sensors = read_list(row, 1);
if (!first) {
buf << ",";
@@ -101,18 +106,9 @@ void handle_device_get(const request &req, const response &res, string device) {
buf << endl << "]" << endl;
- cass_result_free(result);
cass_iterator_free(rows);
res.end(buf.str() + "\r\n");
- }, [&](auto future, auto err) {
- header_map headers;
- headers.emplace("content-type", text_plain);
- res.write_head(500, headers);
-
- stringstream buf;
- buf << "Bad shit: " << error_message(future) << "\r\n";
- res.end(buf.str());
});
}
@@ -120,7 +116,7 @@ using namespace __cxxabiv1;
std::string util_demangle(std::string to_demangle) {
int status = 0;
- char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status);
+ char *buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status);
std::string demangled = buff;
std::free(buff);
return demangled;
@@ -139,6 +135,15 @@ void internal_server_error(const response &res, const string &msg) {
res.end(s);
}
+void on_logging_from_cassandra(const CassLogMessage *message, void *data) {
+ stringstream buf;
+ buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " <<
+ message->file << ":" << message->function << ":" <<
+ message->line << ":" << message->message;
+
+ cout << "CASSANDRA: " << buf.str() << endl;
+}
+
int main(int argc, const char *const argv[]) {
string cassandra_cluster;
po::options_description all("Options");
@@ -169,30 +174,31 @@ int main(int argc, const char *const argv[]) {
return EXIT_FAILURE;
}
- CassFuture *connect_future = nullptr;
- CassCluster *cluster = cass_cluster_new();
- auto session = make_unique<cassandra_session>();
+ cass_log_set_level(CASS_LOG_INFO);
+ cass_log_set_callback(on_logging_from_cassandra, nullptr);
+ auto cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+ cass_cluster_set_num_threads_io(cluster, 1);
- connect_future = cass_session_connect(session->session, cluster);
+ current_cassandra_session = make_unique<cassandra_session>();
+ auto connect_future = cass_session_connect(current_cassandra_session->underlying(), cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
string s = error_message(connect_future);
- cerr << "Could not connect to Cassandra:" << s << endl;
+ cerr << "Could not connect to Cassandra: " << s << endl;
return EXIT_FAILURE;
}
cout << "Connected to Cassandra" << endl;
- current_cassandra_session = std::move(session);
- execute_query(current_cassandra_session->session, "USE " + keyspace_name);
+ execute_query(current_cassandra_session->underlying(), "USE " + keyspace_name);
boost::system::error_code ec;
http2 server;
server.num_threads(4);
server.handle("/", [](const request &req, const response &res) {
- cerr << req.method() << " " << req.uri().path << endl;
+ cout << req.method() << " " << req.uri().path << endl;
vector<string> paths;
auto &path = req.uri().path;
@@ -218,9 +224,9 @@ int main(int argc, const char *const argv[]) {
res.write_head(404);
res.end("Not found :(\r\n");
}
- } catch (const exception& ex) {
+ } catch (const exception &ex) {
internal_server_error(res, ex.what());
- } catch (const string& ex) {
+ } catch (const string &ex) {
internal_server_error(res, ex);
} catch (...) {
auto type = util_demangle(__cxa_current_exception_type()->name());
@@ -228,11 +234,11 @@ int main(int argc, const char *const argv[]) {
}
});
- std::cerr << "Starting server" << endl;
+ cout << "Starting HTTP listener" << endl;
if (server.listen_and_serve(ec, "127.0.0.1", "3000")) {
- std::cerr << "error: " << ec.message() << std::endl;
+ cout << "error: " << ec.message() << endl;
}
- std::cerr << "woot?" << endl;
+ cout << "woot?" << endl;
return EXIT_SUCCESS;
}
diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp
index d7dbfdc..e1f6801 100644
--- a/sm-mqtt-consumer.cpp
+++ b/sm-mqtt-consumer.cpp
@@ -47,7 +47,7 @@ struct device_measurement {
buf << "device=" << device;
buf << ", timestamp=" << timestamp;
std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) {
- buf << ", #" << sensor.sensor << "=" + sensor.value;
+ buf << ", #" << sensor.sensor << "=" << sensor.value;
});
return buf.str();
}
@@ -158,7 +158,7 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen
q.bind(3, sensors);
- return cass_session_execute(session, q.statement);
+ return cass_session_execute(session, q);
}
template<typename Target, typename Source>
@@ -252,7 +252,7 @@ void on_message(const struct mosquitto_message *message) {
cout << "Measurement: " << measurement.str() << endl;
if (current_cassandra_session) {
- handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) {
+ handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) {
cout << "Success!" << endl;
}, [&](auto future, auto err) {
cout << "Failure: " << error_message(future) << endl;
@@ -263,9 +263,9 @@ void on_message(const struct mosquitto_message *message) {
});
}
-int main(int argc, const char **argv) {
- mqtt_lib mqtt_lib();
+mqtt_lib mqtt_lib;
+int main(int argc, const char **argv) {
string cassandra_cluster;
po::options_description all("Options");
all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
@@ -303,7 +303,7 @@ int main(int argc, const char **argv) {
cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
- connect_future = cass_session_connect(session->session, cluster);
+ connect_future = cass_session_connect((CassSession*) session.get(), cluster);
if (cass_future_error_code(connect_future) != CASS_OK) {
string s = to_string(connect_future);
@@ -314,7 +314,7 @@ int main(int argc, const char **argv) {
cout << "Connected to Cassandra" << endl;
current_cassandra_session = std::move(session);
- execute_query(current_cassandra_session->session, "USE " + keyspace_name);
+ execute_query((CassSession*) current_cassandra_session.get(), "USE " + keyspace_name);
should_run = true;
while (should_run) {