diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-16 13:43:13 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-16 13:43:13 +0200 |
commit | b632036b153297f83b10f6d960ccfe0c1772f00e (patch) | |
tree | fd56c04b7b08da0be010a5ce72162a3a474adf29 | |
parent | d77ebb924c1eeca345bbb3f1eeb2df3058a52a18 (diff) | |
download | mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.gz mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.bz2 mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.tar.xz mqtt-cassandra-bridge-b632036b153297f83b10f6d960ccfe0c1772f00e.zip |
o More Cassandra wrappers.
o Using more futures.
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | cassandra_support.h | 162 | ||||
-rw-r--r-- | sm-http-server.cpp | 82 | ||||
-rw-r--r-- | sm-mqtt-consumer.cpp | 14 |
4 files changed, 211 insertions, 49 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 11ede97..75645c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ ExternalProject_Add(nghttp2 PREFIX ${NGHTTP2} BUILD_ALWAYS 0 CONFIGURE_COMMAND cd <SOURCE_DIR> && autoreconf -i && automake && autoconf && ./configure --prefix=<INSTALL_DIR> --enable-asio-lib - BUILD_COMMAND cd <SOURCE_DIR> && make + BUILD_COMMAND cd <SOURCE_DIR> && "$(MAKE)" INSTALL_COMMAND cd <SOURCE_DIR> && make install ) diff --git a/cassandra_support.h b/cassandra_support.h index b0726c8..7b50296 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -17,7 +17,12 @@ using namespace std; class cassandra_error : public runtime_error { public: - cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { + cassandra_error(const string &context, string &&error) : runtime_error( + "Cassandra error: context=" + context + ", error=" + error) { + } + + cassandra_error(const string &context, CassError error) : runtime_error( + "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { } }; @@ -50,6 +55,122 @@ void assert_ok(const string &context, CassError &err) { throw cassandra_error(context, err); } +template<typename Underlying> +class cassandra_wrapper { +public: + inline + operator const Underlying *() const { + return underlying_; + } + + inline + Underlying *underlying() const { + return underlying_; + } + +protected: + cassandra_wrapper(Underlying *underlying) : underlying_(underlying) { + } + + cassandra_wrapper(cassandra_wrapper &&other) { + underlying_ = other.underlying_; + other.underlying_ = nullptr; + } + + cassandra_wrapper(const cassandra_wrapper &) = delete; + + cassandra_wrapper &operator=(const cassandra_wrapper &) = delete; + + virtual ~cassandra_wrapper() { + underlying_ = (Underlying *)0xaabbccdd; + } + + Underlying *underlying_; +}; + +class cassandra_result : public cassandra_wrapper<const CassResult> { +public: + cassandra_result(const CassResult *result) : cassandra_wrapper(result) { + if (result == NULL) { + throw cassandra_error("cassandra_result()", "result is NULL"); + } + } + + cassandra_result(cassandra_result &&other) = default; + + cassandra_result(const cassandra_result &) = delete; + + ~cassandra_result() { + cass_result_free(underlying_); + } + + cassandra_result &operator=(const cassandra_result &) = delete; +}; + +static const CassResult *r; + +class cassandra_future; + +typedef std::function<void(cassandra_future &)> callback_type; + +struct tmp { + callback_type cb; +}; + +class cassandra_future : public cassandra_wrapper<CassFuture> { +public: + cassandra_future(CassFuture *future) : cassandra_wrapper(future) { + } + + ~cassandra_future() { + cass_future_free(underlying_); + } + + cassandra_future(const cassandra_future &) = delete; + + cassandra_future &operator=(const cassandra_future &) = delete; + + cassandra_result result() { + if (fetched) { + throw cassandra_error("cassandra_result::result()", "Already fetched"); + } + fetched = true; + const CassResult *x = cass_future_get_result(underlying()); + size_t count = cass_result_row_count(x); + return std::move(cassandra_result(x)); + } + + string error_message() { + const char *message; + size_t message_length; + cass_future_error_message(underlying(), &message, &message_length); + return string(message, message_length); + } + + CassError error_code() const { + return cass_future_error_code(underlying()); + } + + bool operator!() const { + return error_code() != CASS_OK; + } + + bool ok() const { + return error_code() == CASS_OK; + } + + static void callback(CassFuture *f, void *data) { + tmp *tmp_ = static_cast<tmp *>(data); + auto cb = tmp_->cb; + delete tmp_; + cassandra_future c_f(f); + cb(c_f); + } + +private: + bool fetched = false; +}; + class cassandra_tuple { public: cassandra_tuple(size_t item_count) { @@ -60,6 +181,10 @@ public: cass_tuple_free(tuple); } + cassandra_tuple(const cassandra_tuple &) = delete; + + cassandra_tuple &operator=(const cassandra_tuple &) = delete; + void set(size_t i, cass_int32_t value) { cass_tuple_set_int32(tuple, i, value); } @@ -77,6 +202,10 @@ public: cass_collection_free(collection); } + cassandra_collection(const cassandra_collection &) = delete; + + cassandra_collection &operator=(const cassandra_collection &) = delete; + void append(const cassandra_tuple &&tuple) { cass_collection_append_tuple(collection, tuple.tuple); } @@ -103,7 +232,7 @@ void handle_future(CassFuture *future, std::function<void(CassFuture *)> success CassError rc = cass_future_error_code(future); - if(rc == CASS_OK) { + if (rc == CASS_OK) { success(future); } else { error(future, rc); @@ -122,6 +251,10 @@ public: cass_statement_free(statement); } + cassandra_statement(const cassandra_statement &) = delete; + + cassandra_statement &operator=(const cassandra_statement &) = delete; + void bind(size_t i, const string &value) { auto err = cass_statement_bind_string(statement, i, value.c_str()); assert_ok("cass_statement_bind_string", err); @@ -154,11 +287,19 @@ public: c.append(value); } - std::cout << "values.size=" << values.size() << std::endl; auto err = cass_statement_bind_collection(statement, i, c); assert_ok("cass_statement_bind_collection", err); } + operator CassStatement *() const { + return statement; + } + + CassStatement *underlying() const { + return statement; + } + +private: CassStatement *statement; }; @@ -176,6 +317,21 @@ public: cassandra_session &operator=(const cassandra_session &) = delete; + void execute(cassandra_statement &&stmt, callback_type cb_) { + auto statement = cass_statement_new("SELECT device, timestamp, sensors FROM sm_by_day", 0); + auto future = cass_session_execute(session, stmt.underlying()); + cass_future_set_callback(future, cassandra_future::callback, new tmp{cb_}); + } + + operator CassSession *() const { + return session; + } + + CassSession *underlying() const { + return session; + } + +private: CassSession *session; }; diff --git a/sm-http-server.cpp b/sm-http-server.cpp index 7ac4e67..b1cd37e 100644 --- a/sm-http-server.cpp +++ b/sm-http-server.cpp @@ -1,11 +1,6 @@ #include "cassandra_support.h" #include "http_support.h" -#include <nghttp2/asio_http2_server.h> -#include <iostream> -#include <string> #include <boost/program_options.hpp> -#include <boost/algorithm/string.hpp> -#include <boost/algorithm/string/split.hpp> #include <cxxabi.h> using namespace std; @@ -53,7 +48,7 @@ cass_int32_t read_value_int32(const CassRow *row, const size_t index) { } void handle_device_get(const request &req, const response &res, string device) { - if(!current_cassandra_session) { + if (!current_cassandra_session) { header_map headers; headers.emplace("content-type", text_plain); res.write_head(503, headers); @@ -65,19 +60,30 @@ void handle_device_get(const request &req, const response &res, string device) { cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); stmt.bind(0, device); - vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; + vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", + "2015-07-16"}; stmt.bind(1, std::move(days)); - auto f = cass_session_execute(current_cassandra_session->session, stmt.statement); - handle_future(f, [&](auto future) { + current_cassandra_session->execute(std::move(stmt), [&](cassandra_future& future) { + const cassandra_result result = future.result(); + auto x = result.underlying(); + if (!future.ok()) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Bad shit: " << future.error_message() << "\r\n"; + res.end(buf.str()); + return; + } + header_map headers; headers.emplace("content-type", application_json); res.write_head(200, headers); - const CassResult *result = cass_future_get_result(future); - size_t count = cass_result_row_count(result); - cout << "row count: " << count << endl; - CassIterator *rows = cass_iterator_from_result(result); + size_t count = cass_result_row_count(x); + CassIterator *rows = cass_iterator_from_result(x); stringstream buf; buf << "["; @@ -88,8 +94,7 @@ void handle_device_get(const request &req, const response &res, string device) { string d = read_string(row, 0); auto timestamp = read_value_int64(row, 1); -// auto sensors = read_string(row, 1); - int value = -1; +// auto sensors = read_list(row, 1); if (!first) { buf << ","; @@ -101,18 +106,9 @@ void handle_device_get(const request &req, const response &res, string device) { buf << endl << "]" << endl; - cass_result_free(result); cass_iterator_free(rows); res.end(buf.str() + "\r\n"); - }, [&](auto future, auto err) { - header_map headers; - headers.emplace("content-type", text_plain); - res.write_head(500, headers); - - stringstream buf; - buf << "Bad shit: " << error_message(future) << "\r\n"; - res.end(buf.str()); }); } @@ -120,7 +116,7 @@ using namespace __cxxabiv1; std::string util_demangle(std::string to_demangle) { int status = 0; - char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); + char *buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); std::string demangled = buff; std::free(buff); return demangled; @@ -139,6 +135,15 @@ void internal_server_error(const response &res, const string &msg) { res.end(s); } +void on_logging_from_cassandra(const CassLogMessage *message, void *data) { + stringstream buf; + buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " << + message->file << ":" << message->function << ":" << + message->line << ":" << message->message; + + cout << "CASSANDRA: " << buf.str() << endl; +} + int main(int argc, const char *const argv[]) { string cassandra_cluster; po::options_description all("Options"); @@ -169,30 +174,31 @@ int main(int argc, const char *const argv[]) { return EXIT_FAILURE; } - CassFuture *connect_future = nullptr; - CassCluster *cluster = cass_cluster_new(); - auto session = make_unique<cassandra_session>(); + cass_log_set_level(CASS_LOG_INFO); + cass_log_set_callback(on_logging_from_cassandra, nullptr); + auto cluster = cass_cluster_new(); cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + cass_cluster_set_num_threads_io(cluster, 1); - connect_future = cass_session_connect(session->session, cluster); + current_cassandra_session = make_unique<cassandra_session>(); + auto connect_future = cass_session_connect(current_cassandra_session->underlying(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = error_message(connect_future); - cerr << "Could not connect to Cassandra:" << s << endl; + cerr << "Could not connect to Cassandra: " << s << endl; return EXIT_FAILURE; } cout << "Connected to Cassandra" << endl; - current_cassandra_session = std::move(session); - execute_query(current_cassandra_session->session, "USE " + keyspace_name); + execute_query(current_cassandra_session->underlying(), "USE " + keyspace_name); boost::system::error_code ec; http2 server; server.num_threads(4); server.handle("/", [](const request &req, const response &res) { - cerr << req.method() << " " << req.uri().path << endl; + cout << req.method() << " " << req.uri().path << endl; vector<string> paths; auto &path = req.uri().path; @@ -218,9 +224,9 @@ int main(int argc, const char *const argv[]) { res.write_head(404); res.end("Not found :(\r\n"); } - } catch (const exception& ex) { + } catch (const exception &ex) { internal_server_error(res, ex.what()); - } catch (const string& ex) { + } catch (const string &ex) { internal_server_error(res, ex); } catch (...) { auto type = util_demangle(__cxa_current_exception_type()->name()); @@ -228,11 +234,11 @@ int main(int argc, const char *const argv[]) { } }); - std::cerr << "Starting server" << endl; + cout << "Starting HTTP listener" << endl; if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { - std::cerr << "error: " << ec.message() << std::endl; + cout << "error: " << ec.message() << endl; } - std::cerr << "woot?" << endl; + cout << "woot?" << endl; return EXIT_SUCCESS; } diff --git a/sm-mqtt-consumer.cpp b/sm-mqtt-consumer.cpp index d7dbfdc..e1f6801 100644 --- a/sm-mqtt-consumer.cpp +++ b/sm-mqtt-consumer.cpp @@ -47,7 +47,7 @@ struct device_measurement { buf << "device=" << device; buf << ", timestamp=" << timestamp; std::for_each(sensors.begin(), sensors.end(), [&](auto &sensor) { - buf << ", #" << sensor.sensor << "=" + sensor.value; + buf << ", #" << sensor.sensor << "=" << sensor.value; }); return buf.str(); } @@ -158,7 +158,7 @@ auto insert_into_sm_by_day(CassSession *session, device_measurement &&measuremen q.bind(3, sensors); - return cass_session_execute(session, q.statement); + return cass_session_execute(session, q); } template<typename Target, typename Source> @@ -252,7 +252,7 @@ void on_message(const struct mosquitto_message *message) { cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { - handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + handle_future(insert_into_sm_by_day((CassSession*) current_cassandra_session.get(), std::move(measurement)), [&](auto future) { cout << "Success!" << endl; }, [&](auto future, auto err) { cout << "Failure: " << error_message(future) << endl; @@ -263,9 +263,9 @@ void on_message(const struct mosquitto_message *message) { }); } -int main(int argc, const char **argv) { - mqtt_lib mqtt_lib(); +mqtt_lib mqtt_lib; +int main(int argc, const char **argv) { string cassandra_cluster; po::options_description all("Options"); all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); @@ -303,7 +303,7 @@ int main(int argc, const char **argv) { cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); - connect_future = cass_session_connect(session->session, cluster); + connect_future = cass_session_connect((CassSession*) session.get(), cluster); if (cass_future_error_code(connect_future) != CASS_OK) { string s = to_string(connect_future); @@ -314,7 +314,7 @@ int main(int argc, const char **argv) { cout << "Connected to Cassandra" << endl; current_cassandra_session = std::move(session); - execute_query(current_cassandra_session->session, "USE " + keyspace_name); + execute_query((CassSession*) current_cassandra_session.get(), "USE " + keyspace_name); should_run = true; while (should_run) { |