diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-14 01:04:41 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-07-14 01:04:41 +0200 |
commit | 643d2aaf8d5617487c26ba4d02af65dfcd3e0d88 (patch) | |
tree | 7e6be55672de7c45ae85f86863372dd94fc7605a | |
parent | f2ff3cfcdc503be98b7d4b9f24f313c5732a0c17 (diff) | |
download | mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.gz mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.bz2 mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.tar.xz mqtt-cassandra-bridge-643d2aaf8d5617487c26ba4d02af65dfcd3e0d88.zip |
o Adding web server to serve responses.
-rw-r--r-- | CMakeLists.txt | 72 | ||||
-rw-r--r-- | README.md | 14 | ||||
-rw-r--r-- | cassandra_support.h | 48 | ||||
-rw-r--r-- | http-tests.cpp | 29 | ||||
-rw-r--r-- | http_support.h | 99 | ||||
-rw-r--r-- | main.cpp | 39 | ||||
-rw-r--r-- | sm_web_server.cpp | 238 |
7 files changed, 492 insertions, 47 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 74197cb..b9d9691 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,14 +1,12 @@ cmake_minimum_required(VERSION 3.2) project(mqtt_cassandra_bridge) -find_package(Boost COMPONENTS regex system program_options REQUIRED) - -add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h) -target_link_libraries(mqtt_cassandra_bridge PUBLIC ${Boost_LIBRARIES}) -target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14") +find_package(Boost COMPONENTS system program_options unit_test_framework REQUIRED) include(ExternalProject) +set(SHARED_COMPILE_OPTIONS "-std=c++14") + # Cassandra set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver) ExternalProject_Add(cpp-driver @@ -17,9 +15,6 @@ ExternalProject_Add(cpp-driver PREFIX ${CPP_DRIVER} CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> ) -target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include) -target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) -add_dependencies(mqtt_cassandra_bridge cpp-driver) # ble-toys set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys) @@ -27,13 +22,68 @@ ExternalProject_Add(ble-toys GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567 GIT_SUBMODULES json - BUILD_ALWAYS 0 PREFIX ${BLE_TOYS} CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR> ) + +# nghttp2 +set(NGHTTP2 ${CMAKE_CURRENT_BINARY_DIR}/nghttp2) +ExternalProject_Add(nghttp2 + URL https://github.com/tatsuhiro-t/nghttp2/releases/download/v1.0.5/nghttp2-1.0.5.tar.xz + URL_MD5 ff25d732d79128c4fa426393a635c21e + PREFIX ${NGHTTP2} + BUILD_ALWAYS 0 + CONFIGURE_COMMAND cd <SOURCE_DIR> && autoreconf -i && automake && autoconf && ./configure --prefix=<INSTALL_DIR> --enable-asio-lib + BUILD_COMMAND cd <SOURCE_DIR> && make + INSTALL_COMMAND cd <SOURCE_DIR> && make install +) + +# Mosquitto +# TODO: proper discovery + +# OpenSSL +# TODO: proper discovery + +# mqtt_cassandra_bridge +add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h) +add_dependencies(mqtt_cassandra_bridge cpp-driver) +## Boost +target_link_libraries(mqtt_cassandra_bridge PRIVATE ${Boost_LIBRARIES}) +## Cassandra +target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +## Mosquitto +target_compile_options(mqtt_cassandra_bridge PUBLIC ${SHARED_COMPILE_OPTIONS}) +target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp) +## Ble toys add_dependencies(mqtt_cassandra_bridge ble-toys) target_include_directories(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/include) target_link_libraries(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a) -# Mosquitto -target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp) +# sm_web_server +add_executable(sm_web_server sm_web_server.cpp cassandra_support.h http_support.h) +target_compile_options(sm_web_server PUBLIC ${SHARED_COMPILE_OPTIONS}) +## Boost +target_link_libraries(sm_web_server PRIVATE ${Boost_LIBRARIES}) +## Cassandra +add_dependencies(sm_web_server cpp-driver) +target_include_directories(sm_web_server PRIVATE ${CPP_DRIVER}/include) +target_link_libraries(sm_web_server PRIVATE ${CPP_DRIVER}/lib/libcassandra.so) +## Nghttp2 +add_dependencies(sm_web_server nghttp2) +target_include_directories(sm_web_server PRIVATE ${NGHTTP2}/include) +target_link_libraries(sm_web_server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a) +## Misc +target_link_libraries(sm_web_server PRIVATE ssl crypto pthread) + +enable_testing() +add_executable(http-tests http-tests.cpp http_support.h) +target_compile_options(http-tests PUBLIC ${SHARED_COMPILE_OPTIONS}) +add_dependencies(http-tests nghttp2) +target_include_directories(http-tests PRIVATE ${NGHTTP2}/include) +target_link_libraries(http-tests PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a) +target_link_libraries(http-tests PRIVATE ssl crypto pthread) +target_link_libraries(http-tests PRIVATE ${Boost_LIBRARIES}) +target_compile_definitions(http-tests PUBLIC -DBOOST_TEST_DYN_LINK) + +add_test(NAME http-tests COMMAND http-tests) @@ -1,3 +1,17 @@ +# Schema + + CREATE TABLE sm_by_day ( + device text, + day text, + timestamp timestamp, + sensors list<frozen<tuple<int, int>>>, + PRIMARY KEY ( + (device, day), + timestamp + ) + ); + + # Create Schema $ bin/cqlsh diff --git a/cassandra_support.h b/cassandra_support.h index 167f69b..b0726c8 100644 --- a/cassandra_support.h +++ b/cassandra_support.h @@ -3,17 +3,21 @@ #include <stddef.h> #include <string> +#include <algorithm> #include <cassandra.h> #include <stdexcept> +#include <functional> +#include <vector> +#include <iostream> namespace trygvis { namespace cassandra_support { using namespace std; -class cassandra_error : runtime_error { +class cassandra_error : public runtime_error { public: - cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) { + cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) { } }; @@ -73,21 +77,39 @@ public: cass_collection_free(collection); } - void append_tuple(cassandra_tuple &&tuple) { + void append(const cassandra_tuple &&tuple) { cass_collection_append_tuple(collection, tuple.tuple); } + void append(const string &&value) { + cass_collection_append_string(collection, value.c_str()); + } + + void append(const string &value) { + cass_collection_append_string(collection, value.c_str()); + } + + operator CassCollection *() const { + return collection; + }; + +private: CassCollection *collection; }; -CassError wait_for_future(CassFuture *future) { +void handle_future(CassFuture *future, std::function<void(CassFuture *)> success, + std::function<void(CassFuture *, CassError)> error) { cass_future_wait(future); CassError rc = cass_future_error_code(future); - cass_future_free(future); + if(rc == CASS_OK) { + success(future); + } else { + error(future, rc); + } - return rc; + cass_future_free(future); }; class cassandra_statement { @@ -121,7 +143,19 @@ public: } void bind(size_t i, const cassandra_collection &value) { - auto err = cass_statement_bind_collection(statement, i, value.collection); + auto err = cass_statement_bind_collection(statement, i, value); + assert_ok("cass_statement_bind_collection", err); + } + + void bind(size_t i, const std::vector<string> &&values) { + cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size()); + + for (const auto &value : values) { + c.append(value); + } + + std::cout << "values.size=" << values.size() << std::endl; + auto err = cass_statement_bind_collection(statement, i, c); assert_ok("cass_statement_bind_collection", err); } diff --git a/http-tests.cpp b/http-tests.cpp new file mode 100644 index 0000000..e912959 --- /dev/null +++ b/http-tests.cpp @@ -0,0 +1,29 @@ +#include "http_support.h" + +#define BOOST_TEST_MODULE "http_tests" + +#include <boost/test/unit_test.hpp> + +using namespace std; +using namespace boost; +using namespace trygvis::http_support; + +BOOST_AUTO_TEST_CASE(root) { + BOOST_ASSERT(matches(vector<string>{""})); +} + +BOOST_AUTO_TEST_CASE(test_matcher) { + param device("device"); + BOOST_ASSERT(matches({"device", "aa:bb:cc:dd:ee:ff"}, "device", device)); + BOOST_ASSERT(device.value == "aa:bb:cc:dd:ee:ff"); +} + +BOOST_AUTO_TEST_CASE(test_matcher_2) { + BOOST_ASSERT(!matches({"device", "aa:bb:cc:dd:ee:ff"})); +} + +BOOST_AUTO_TEST_CASE(test_matcher_3) { + param device("device"); + BOOST_ASSERT(matches({""})); + BOOST_ASSERT(!matches({""}, "device", device)); +} diff --git a/http_support.h b/http_support.h new file mode 100644 index 0000000..f3896f8 --- /dev/null +++ b/http_support.h @@ -0,0 +1,99 @@ +#ifndef MQTT_CASSANDRA_BRIDGE_HTTP_SUPPORT_H +#define MQTT_CASSANDRA_BRIDGE_HTTP_SUPPORT_H + +#include <nghttp2/asio_http2_server.h> +#include <iostream> +#include <string> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/split.hpp> + +namespace trygvis { +namespace http_support { +using namespace std; +using namespace nghttp2::asio_http2::server; + +class param { +public: + explicit param(const string &name) : name(name) { + } + + param(const param &) = delete; + + virtual ~param() { + } + + param operator=(param &) = delete; + + const string name; + string value; +}; + +namespace matcher { + +bool match(string &path, const string &expected_path) { + cout << "match (string), path=" << path << ", expected=" << expected_path << endl; + if (expected_path.length() == 0) { + throw runtime_error("Invalid path: path.length() == 0"); + } + return path == expected_path; +} + +bool match(string &path, const char *expected_path) { + cout << "match (char*), path=" << path << ", expected=" << expected_path << endl; + if (*expected_path == '\0') { + throw runtime_error("Invalid path: path.length() == 0"); + } + return path == expected_path; +} + +bool match(string &path, param &expected) { + cout << "match (param), path=" << path << ", key=" << expected.name << endl; + expected.value = path; + return true; +} + +bool matches(vector<string>::const_iterator paths, vector<string>::const_iterator end) { + return paths == end; +} + +template<typename T, typename... Args> +bool matches(vector<string>::const_iterator paths, vector<string>::const_iterator end, T ¶m, Args &... params) { + // Nothing more to check, but we have parameters. + if (paths == end) { + return false; + } + + auto path = *paths++; + + if (!match(path, param)) { + return false; + } + + return matches(paths, end, params...); +}; + +} // matcher + +bool matches(vector<string> paths) { + cout << "matches(), paths=" << boost::algorithm::join(paths, "/") << endl; + return paths.size() == 1 && paths[0] == ""; +} + +template<typename T, typename... Args> +bool matches(vector<string> paths, T ¶m, Args &... params) { + cout << "matches(...), paths=" << boost::algorithm::join(paths, "/") << endl; +// if (paths.size() == 1 && paths[0] == "") { +// return true; +// } + return matcher::matches(paths.begin(), paths.end(), param, params...); +} + +void method_not_allowed(const request &req, const response &res) { + res.write_head(405); + + res.end("Method not allowed: " + req.method() + "\r\n"); +} + +} +} +#endif @@ -136,39 +136,29 @@ void print_error(CassFuture *future) { cout << "Cassandra error: " << error_message(future) << endl; } -/* - CREATE TABLE sm_by_day ( - device text, - day text, - timestamp timestamp, - sensors list<frozen<tuple<int, int>>>, - PRIMARY KEY ((device, day), timestamp) - ) - */ -CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { +auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) { cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4); q.bind(0, measurement.device); - std::time_t t = std::time(NULL); + std::time_t t = measurement.timestamp; char day[100]; std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t)); q.bind(1, day); - auto timestamp = std::time(NULL); - q.bind(2, timestamp); + q.bind(2, measurement.timestamp * 1000); cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size()); for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) { cassandra_tuple tuple(2); tuple.set(0, sensor.sensor); tuple.set(1, sensor.value); - sensors.append_tuple(std::move(tuple)); + sensors.append(std::move(tuple)); }); q.bind(3, sensors); - return wait_for_future(cass_session_execute(session, q.statement)); + return cass_session_execute(session, q.statement); } template<typename Target, typename Source> @@ -189,15 +179,6 @@ boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Tar return f(a.get()); } -//template<typename Target, typename Source> -//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) { -// if (!a.is_initialized()) { -// return boost::none; -// } -// -// return f(a.get()); -//} - template<typename Target, typename Source = string> boost::optional<Target> l_c(const Source source) { try { @@ -271,11 +252,11 @@ void on_message(const struct mosquitto_message *message) { cout << "Measurement: " << measurement.str() << endl; if (current_cassandra_session) { - auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)); - - cout << "rc=" << rc << endl; - - assert_ok("wait_for_future", rc); + handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) { + cout << "Success!" << endl; + }, [&](auto future, auto err) { + cout << "Failure: " << error_message(future) << endl; + }); } else { cout << "Not connected to Cassandra" << endl; } diff --git a/sm_web_server.cpp b/sm_web_server.cpp new file mode 100644 index 0000000..7ac4e67 --- /dev/null +++ b/sm_web_server.cpp @@ -0,0 +1,238 @@ +#include "cassandra_support.h" +#include "http_support.h" +#include <nghttp2/asio_http2_server.h> +#include <iostream> +#include <string> +#include <boost/program_options.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/split.hpp> +#include <cxxabi.h> + +using namespace std; +using namespace nghttp2::asio_http2; +using namespace nghttp2::asio_http2::server; +using namespace trygvis::cassandra_support; +using namespace trygvis::http_support; +namespace po = boost::program_options; + +static unique_ptr<cassandra_session> current_cassandra_session; +static string keyspace_name = "soil_moisture"; + +const auto text_plain = header_value{"text/plain"}; +const auto application_json = header_value{"application/json"}; + +string read_string(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + const char *buf; + size_t len; + auto err = cass_value_get_string(value, &buf, &len); + assert_ok("cass_value_get_string", err); + + return string(buf, len); +} + +cass_int64_t read_value_int64(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int64_t data; + auto err = cass_value_get_int64(value, &data); + assert_ok("cass_value_get_int64", err); + + return data; +} + +cass_int32_t read_value_int32(const CassRow *row, const size_t index) { + const CassValue *value = cass_row_get_column(row, index); + + cass_int32_t data; + auto err = cass_value_get_int32(value, &data); + assert_ok("cass_value_get_int32", err); + + return data; +} + +void handle_device_get(const request &req, const response &res, string device) { + if(!current_cassandra_session) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(503, headers); + res.end("No connection to database."); + return; + } + + cout << "handle_device_get(" << device << ");" << endl; + + cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2); + stmt.bind(0, device); + vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"}; + stmt.bind(1, std::move(days)); + + auto f = cass_session_execute(current_cassandra_session->session, stmt.statement); + handle_future(f, [&](auto future) { + header_map headers; + headers.emplace("content-type", application_json); + res.write_head(200, headers); + + const CassResult *result = cass_future_get_result(future); + size_t count = cass_result_row_count(result); + cout << "row count: " << count << endl; + CassIterator *rows = cass_iterator_from_result(result); + + stringstream buf; + buf << "["; + + bool first = true; + while (cass_iterator_next(rows)) { + const CassRow *row = cass_iterator_get_row(rows); + + string d = read_string(row, 0); + auto timestamp = read_value_int64(row, 1); +// auto sensors = read_string(row, 1); + int value = -1; + + if (!first) { + buf << ","; + } else { + first = false; + } + buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}"; + } + + buf << endl << "]" << endl; + + cass_result_free(result); + cass_iterator_free(rows); + + res.end(buf.str() + "\r\n"); + }, [&](auto future, auto err) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Bad shit: " << error_message(future) << "\r\n"; + res.end(buf.str()); + }); +} + +using namespace __cxxabiv1; + +std::string util_demangle(std::string to_demangle) { + int status = 0; + char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status); + std::string demangled = buff; + std::free(buff); + return demangled; +} + +void internal_server_error(const response &res, const string &msg) { + header_map headers; + headers.emplace("content-type", text_plain); + res.write_head(500, headers); + + stringstream buf; + buf << "Internal server error: " << msg << "\r\n"; + auto s = buf.str(); + + cout << s << endl; + res.end(s); +} + +int main(int argc, const char *const argv[]) { + string cassandra_cluster; + po::options_description all("Options"); + all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1")); + + po::variables_map vm; + try { + auto parsed = po::parse_command_line(argc, argv, all); + po::store(parsed, vm); + po::notify(vm); + auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional); + + if (vm.count("help")) { + cerr << all << "\n"; + return EXIT_FAILURE; + } + + if (unrecognized.size()) { + cerr << "Unrecognized option: " << unrecognized.at(0) << "\n"; + return EXIT_FAILURE; + } + + } catch (po::required_option &e) { + cerr << "Missing required option: " << e.get_option_name() << endl; + cerr << all << endl; + } catch (po::unknown_option &e) { + cerr << e.what() << endl; + return EXIT_FAILURE; + } + + CassFuture *connect_future = nullptr; + CassCluster *cluster = cass_cluster_new(); + auto session = make_unique<cassandra_session>(); + + cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str()); + + connect_future = cass_session_connect(session->session, cluster); + + if (cass_future_error_code(connect_future) != CASS_OK) { + string s = error_message(connect_future); + cerr << "Could not connect to Cassandra:" << s << endl; + return EXIT_FAILURE; + } + + cout << "Connected to Cassandra" << endl; + current_cassandra_session = std::move(session); + + execute_query(current_cassandra_session->session, "USE " + keyspace_name); + boost::system::error_code ec; + http2 server; + server.num_threads(4); + + server.handle("/", [](const request &req, const response &res) { + cerr << req.method() << " " << req.uri().path << endl; + + vector<string> paths; + auto &path = req.uri().path; + boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on); + + if (paths.begin()->size() == 0) { + paths.erase(paths.begin()); + } + + try { + for (auto &p : paths) { + cout << "path ->" << p << "<-" << endl; + } + + param device("device"); + if (matches(paths, "device", device)) { + if (req.method() == "GET") { + handle_device_get(req, res, device.value); + } else { + method_not_allowed(req, res); + } + } else { + res.write_head(404); + res.end("Not found :(\r\n"); + } + } catch (const exception& ex) { + internal_server_error(res, ex.what()); + } catch (const string& ex) { + internal_server_error(res, ex); + } catch (...) { + auto type = util_demangle(__cxa_current_exception_type()->name()); + internal_server_error(res, "Unknown exception, type: " + type); + } + }); + + std::cerr << "Starting server" << endl; + if (server.listen_and_serve(ec, "127.0.0.1", "3000")) { + std::cerr << "error: " << ec.message() << std::endl; + } + std::cerr << "woot?" << endl; + + return EXIT_SUCCESS; +} |