aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt72
-rw-r--r--README.md14
-rw-r--r--cassandra_support.h48
-rw-r--r--http-tests.cpp29
-rw-r--r--http_support.h99
-rw-r--r--main.cpp39
-rw-r--r--sm_web_server.cpp238
7 files changed, 492 insertions, 47 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 74197cb..b9d9691 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,14 +1,12 @@
cmake_minimum_required(VERSION 3.2)
project(mqtt_cassandra_bridge)
-find_package(Boost COMPONENTS regex system program_options REQUIRED)
-
-add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h)
-target_link_libraries(mqtt_cassandra_bridge PUBLIC ${Boost_LIBRARIES})
-target_compile_options(mqtt_cassandra_bridge PUBLIC "-std=c++14")
+find_package(Boost COMPONENTS system program_options unit_test_framework REQUIRED)
include(ExternalProject)
+set(SHARED_COMPILE_OPTIONS "-std=c++14")
+
# Cassandra
set(CPP_DRIVER ${CMAKE_CURRENT_BINARY_DIR}/cpp-driver)
ExternalProject_Add(cpp-driver
@@ -17,9 +15,6 @@ ExternalProject_Add(cpp-driver
PREFIX ${CPP_DRIVER}
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
)
-target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include)
-target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so)
-add_dependencies(mqtt_cassandra_bridge cpp-driver)
# ble-toys
set(BLE_TOYS ${CMAKE_CURRENT_BINARY_DIR}/ble-toys)
@@ -27,13 +22,68 @@ ExternalProject_Add(ble-toys
GIT_REPOSITORY https://trygvis.io/git/2015/02/ble-toys.git
GIT_TAG 650fb016ce36cfda2e8073764196655ee6a50567
GIT_SUBMODULES json
- BUILD_ALWAYS 0
PREFIX ${BLE_TOYS}
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=<INSTALL_DIR>
)
+
+# nghttp2
+set(NGHTTP2 ${CMAKE_CURRENT_BINARY_DIR}/nghttp2)
+ExternalProject_Add(nghttp2
+ URL https://github.com/tatsuhiro-t/nghttp2/releases/download/v1.0.5/nghttp2-1.0.5.tar.xz
+ URL_MD5 ff25d732d79128c4fa426393a635c21e
+ PREFIX ${NGHTTP2}
+ BUILD_ALWAYS 0
+ CONFIGURE_COMMAND cd <SOURCE_DIR> && autoreconf -i && automake && autoconf && ./configure --prefix=<INSTALL_DIR> --enable-asio-lib
+ BUILD_COMMAND cd <SOURCE_DIR> && make
+ INSTALL_COMMAND cd <SOURCE_DIR> && make install
+)
+
+# Mosquitto
+# TODO: proper discovery
+
+# OpenSSL
+# TODO: proper discovery
+
+# mqtt_cassandra_bridge
+add_executable(mqtt_cassandra_bridge main.cpp cassandra_support.h)
+add_dependencies(mqtt_cassandra_bridge cpp-driver)
+## Boost
+target_link_libraries(mqtt_cassandra_bridge PRIVATE ${Boost_LIBRARIES})
+## Cassandra
+target_include_directories(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/include)
+target_link_libraries(mqtt_cassandra_bridge PRIVATE ${CPP_DRIVER}/lib/libcassandra.so)
+## Mosquitto
+target_compile_options(mqtt_cassandra_bridge PUBLIC ${SHARED_COMPILE_OPTIONS})
+target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp)
+## Ble toys
add_dependencies(mqtt_cassandra_bridge ble-toys)
target_include_directories(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/include)
target_link_libraries(mqtt_cassandra_bridge PRIVATE ${BLE_TOYS}/lib/trygvis/libtrygvis-sensor.a)
-# Mosquitto
-target_link_libraries(mqtt_cassandra_bridge PRIVATE mosquitto mosquittopp)
+# sm_web_server
+add_executable(sm_web_server sm_web_server.cpp cassandra_support.h http_support.h)
+target_compile_options(sm_web_server PUBLIC ${SHARED_COMPILE_OPTIONS})
+## Boost
+target_link_libraries(sm_web_server PRIVATE ${Boost_LIBRARIES})
+## Cassandra
+add_dependencies(sm_web_server cpp-driver)
+target_include_directories(sm_web_server PRIVATE ${CPP_DRIVER}/include)
+target_link_libraries(sm_web_server PRIVATE ${CPP_DRIVER}/lib/libcassandra.so)
+## Nghttp2
+add_dependencies(sm_web_server nghttp2)
+target_include_directories(sm_web_server PRIVATE ${NGHTTP2}/include)
+target_link_libraries(sm_web_server PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a)
+## Misc
+target_link_libraries(sm_web_server PRIVATE ssl crypto pthread)
+
+enable_testing()
+add_executable(http-tests http-tests.cpp http_support.h)
+target_compile_options(http-tests PUBLIC ${SHARED_COMPILE_OPTIONS})
+add_dependencies(http-tests nghttp2)
+target_include_directories(http-tests PRIVATE ${NGHTTP2}/include)
+target_link_libraries(http-tests PRIVATE ${NGHTTP2}/lib/libnghttp2_asio.a ${NGHTTP2}/lib/libnghttp2.a)
+target_link_libraries(http-tests PRIVATE ssl crypto pthread)
+target_link_libraries(http-tests PRIVATE ${Boost_LIBRARIES})
+target_compile_definitions(http-tests PUBLIC -DBOOST_TEST_DYN_LINK)
+
+add_test(NAME http-tests COMMAND http-tests)
diff --git a/README.md b/README.md
index 6ce8d37..91681e4 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,17 @@
+# Schema
+
+ CREATE TABLE sm_by_day (
+ device text,
+ day text,
+ timestamp timestamp,
+ sensors list<frozen<tuple<int, int>>>,
+ PRIMARY KEY (
+ (device, day),
+ timestamp
+ )
+ );
+
+
# Create Schema
$ bin/cqlsh
diff --git a/cassandra_support.h b/cassandra_support.h
index 167f69b..b0726c8 100644
--- a/cassandra_support.h
+++ b/cassandra_support.h
@@ -3,17 +3,21 @@
#include <stddef.h>
#include <string>
+#include <algorithm>
#include <cassandra.h>
#include <stdexcept>
+#include <functional>
+#include <vector>
+#include <iostream>
namespace trygvis {
namespace cassandra_support {
using namespace std;
-class cassandra_error : runtime_error {
+class cassandra_error : public runtime_error {
public:
- cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + to_string((int)error)) {
+ cassandra_error(const string &context, CassError error) : runtime_error("Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
}
};
@@ -73,21 +77,39 @@ public:
cass_collection_free(collection);
}
- void append_tuple(cassandra_tuple &&tuple) {
+ void append(const cassandra_tuple &&tuple) {
cass_collection_append_tuple(collection, tuple.tuple);
}
+ void append(const string &&value) {
+ cass_collection_append_string(collection, value.c_str());
+ }
+
+ void append(const string &value) {
+ cass_collection_append_string(collection, value.c_str());
+ }
+
+ operator CassCollection *() const {
+ return collection;
+ };
+
+private:
CassCollection *collection;
};
-CassError wait_for_future(CassFuture *future) {
+void handle_future(CassFuture *future, std::function<void(CassFuture *)> success,
+ std::function<void(CassFuture *, CassError)> error) {
cass_future_wait(future);
CassError rc = cass_future_error_code(future);
- cass_future_free(future);
+ if(rc == CASS_OK) {
+ success(future);
+ } else {
+ error(future, rc);
+ }
- return rc;
+ cass_future_free(future);
};
class cassandra_statement {
@@ -121,7 +143,19 @@ public:
}
void bind(size_t i, const cassandra_collection &value) {
- auto err = cass_statement_bind_collection(statement, i, value.collection);
+ auto err = cass_statement_bind_collection(statement, i, value);
+ assert_ok("cass_statement_bind_collection", err);
+ }
+
+ void bind(size_t i, const std::vector<string> &&values) {
+ cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size());
+
+ for (const auto &value : values) {
+ c.append(value);
+ }
+
+ std::cout << "values.size=" << values.size() << std::endl;
+ auto err = cass_statement_bind_collection(statement, i, c);
assert_ok("cass_statement_bind_collection", err);
}
diff --git a/http-tests.cpp b/http-tests.cpp
new file mode 100644
index 0000000..e912959
--- /dev/null
+++ b/http-tests.cpp
@@ -0,0 +1,29 @@
+#include "http_support.h"
+
+#define BOOST_TEST_MODULE "http_tests"
+
+#include <boost/test/unit_test.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace trygvis::http_support;
+
+BOOST_AUTO_TEST_CASE(root) {
+ BOOST_ASSERT(matches(vector<string>{""}));
+}
+
+BOOST_AUTO_TEST_CASE(test_matcher) {
+ param device("device");
+ BOOST_ASSERT(matches({"device", "aa:bb:cc:dd:ee:ff"}, "device", device));
+ BOOST_ASSERT(device.value == "aa:bb:cc:dd:ee:ff");
+}
+
+BOOST_AUTO_TEST_CASE(test_matcher_2) {
+ BOOST_ASSERT(!matches({"device", "aa:bb:cc:dd:ee:ff"}));
+}
+
+BOOST_AUTO_TEST_CASE(test_matcher_3) {
+ param device("device");
+ BOOST_ASSERT(matches({""}));
+ BOOST_ASSERT(!matches({""}, "device", device));
+}
diff --git a/http_support.h b/http_support.h
new file mode 100644
index 0000000..f3896f8
--- /dev/null
+++ b/http_support.h
@@ -0,0 +1,99 @@
+#ifndef MQTT_CASSANDRA_BRIDGE_HTTP_SUPPORT_H
+#define MQTT_CASSANDRA_BRIDGE_HTTP_SUPPORT_H
+
+#include <nghttp2/asio_http2_server.h>
+#include <iostream>
+#include <string>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/split.hpp>
+
+namespace trygvis {
+namespace http_support {
+using namespace std;
+using namespace nghttp2::asio_http2::server;
+
+class param {
+public:
+ explicit param(const string &name) : name(name) {
+ }
+
+ param(const param &) = delete;
+
+ virtual ~param() {
+ }
+
+ param operator=(param &) = delete;
+
+ const string name;
+ string value;
+};
+
+namespace matcher {
+
+bool match(string &path, const string &expected_path) {
+ cout << "match (string), path=" << path << ", expected=" << expected_path << endl;
+ if (expected_path.length() == 0) {
+ throw runtime_error("Invalid path: path.length() == 0");
+ }
+ return path == expected_path;
+}
+
+bool match(string &path, const char *expected_path) {
+ cout << "match (char*), path=" << path << ", expected=" << expected_path << endl;
+ if (*expected_path == '\0') {
+ throw runtime_error("Invalid path: path.length() == 0");
+ }
+ return path == expected_path;
+}
+
+bool match(string &path, param &expected) {
+ cout << "match (param), path=" << path << ", key=" << expected.name << endl;
+ expected.value = path;
+ return true;
+}
+
+bool matches(vector<string>::const_iterator paths, vector<string>::const_iterator end) {
+ return paths == end;
+}
+
+template<typename T, typename... Args>
+bool matches(vector<string>::const_iterator paths, vector<string>::const_iterator end, T &param, Args &... params) {
+ // Nothing more to check, but we have parameters.
+ if (paths == end) {
+ return false;
+ }
+
+ auto path = *paths++;
+
+ if (!match(path, param)) {
+ return false;
+ }
+
+ return matches(paths, end, params...);
+};
+
+} // matcher
+
+bool matches(vector<string> paths) {
+ cout << "matches(), paths=" << boost::algorithm::join(paths, "/") << endl;
+ return paths.size() == 1 && paths[0] == "";
+}
+
+template<typename T, typename... Args>
+bool matches(vector<string> paths, T &param, Args &... params) {
+ cout << "matches(...), paths=" << boost::algorithm::join(paths, "/") << endl;
+// if (paths.size() == 1 && paths[0] == "") {
+// return true;
+// }
+ return matcher::matches(paths.begin(), paths.end(), param, params...);
+}
+
+void method_not_allowed(const request &req, const response &res) {
+ res.write_head(405);
+
+ res.end("Method not allowed: " + req.method() + "\r\n");
+}
+
+}
+}
+#endif
diff --git a/main.cpp b/main.cpp
index 744f697..d7dbfdc 100644
--- a/main.cpp
+++ b/main.cpp
@@ -136,39 +136,29 @@ void print_error(CassFuture *future) {
cout << "Cassandra error: " << error_message(future) << endl;
}
-/*
- CREATE TABLE sm_by_day (
- device text,
- day text,
- timestamp timestamp,
- sensors list<frozen<tuple<int, int>>>,
- PRIMARY KEY ((device, day), timestamp)
- )
- */
-CassError insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
+auto insert_into_sm_by_day(CassSession *session, device_measurement &&measurement) {
cassandra_statement q("INSERT INTO sm_by_day(device, day, timestamp, sensors) VALUES (?, ?, ?, ?);", 4);
q.bind(0, measurement.device);
- std::time_t t = std::time(NULL);
+ std::time_t t = measurement.timestamp;
char day[100];
std::strftime(day, sizeof(day), "%Y-%m-%d", std::localtime(&t));
q.bind(1, day);
- auto timestamp = std::time(NULL);
- q.bind(2, timestamp);
+ q.bind(2, measurement.timestamp * 1000);
cassandra_collection sensors(CASS_COLLECTION_TYPE_LIST, measurement.sensors.size());
for_each(measurement.sensors.cbegin(), measurement.sensors.cend(), [&](auto sensor) {
cassandra_tuple tuple(2);
tuple.set(0, sensor.sensor);
tuple.set(1, sensor.value);
- sensors.append_tuple(std::move(tuple));
+ sensors.append(std::move(tuple));
});
q.bind(3, sensors);
- return wait_for_future(cass_session_execute(session, q.statement));
+ return cass_session_execute(session, q.statement);
}
template<typename Target, typename Source>
@@ -189,15 +179,6 @@ boost::optional<Target> flat_map(boost::optional<Source> &a, boost::optional<Tar
return f(a.get());
}
-//template<typename Target, typename Source>
-//boost::optional<Target> flat_map(boost::optional<Source> &a, std::function<boost::optional<Target>(Source)> f) {
-// if (!a.is_initialized()) {
-// return boost::none;
-// }
-//
-// return f(a.get());
-//}
-
template<typename Target, typename Source = string>
boost::optional<Target> l_c(const Source source) {
try {
@@ -271,11 +252,11 @@ void on_message(const struct mosquitto_message *message) {
cout << "Measurement: " << measurement.str() << endl;
if (current_cassandra_session) {
- auto rc = insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement));
-
- cout << "rc=" << rc << endl;
-
- assert_ok("wait_for_future", rc);
+ handle_future(insert_into_sm_by_day(current_cassandra_session->session, std::move(measurement)), [&](auto future) {
+ cout << "Success!" << endl;
+ }, [&](auto future, auto err) {
+ cout << "Failure: " << error_message(future) << endl;
+ });
} else {
cout << "Not connected to Cassandra" << endl;
}
diff --git a/sm_web_server.cpp b/sm_web_server.cpp
new file mode 100644
index 0000000..7ac4e67
--- /dev/null
+++ b/sm_web_server.cpp
@@ -0,0 +1,238 @@
+#include "cassandra_support.h"
+#include "http_support.h"
+#include <nghttp2/asio_http2_server.h>
+#include <iostream>
+#include <string>
+#include <boost/program_options.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <cxxabi.h>
+
+using namespace std;
+using namespace nghttp2::asio_http2;
+using namespace nghttp2::asio_http2::server;
+using namespace trygvis::cassandra_support;
+using namespace trygvis::http_support;
+namespace po = boost::program_options;
+
+static unique_ptr<cassandra_session> current_cassandra_session;
+static string keyspace_name = "soil_moisture";
+
+const auto text_plain = header_value{"text/plain"};
+const auto application_json = header_value{"application/json"};
+
+string read_string(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ const char *buf;
+ size_t len;
+ auto err = cass_value_get_string(value, &buf, &len);
+ assert_ok("cass_value_get_string", err);
+
+ return string(buf, len);
+}
+
+cass_int64_t read_value_int64(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ cass_int64_t data;
+ auto err = cass_value_get_int64(value, &data);
+ assert_ok("cass_value_get_int64", err);
+
+ return data;
+}
+
+cass_int32_t read_value_int32(const CassRow *row, const size_t index) {
+ const CassValue *value = cass_row_get_column(row, index);
+
+ cass_int32_t data;
+ auto err = cass_value_get_int32(value, &data);
+ assert_ok("cass_value_get_int32", err);
+
+ return data;
+}
+
+void handle_device_get(const request &req, const response &res, string device) {
+ if(!current_cassandra_session) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(503, headers);
+ res.end("No connection to database.");
+ return;
+ }
+
+ cout << "handle_device_get(" << device << ");" << endl;
+
+ cassandra_statement stmt("SELECT device, timestamp, sensors FROM sm_by_day WHERE device=? AND day IN ?", 2);
+ stmt.bind(0, device);
+ vector<string> days = {"2015-07-10", "2015-07-11", "2015-07-12", "2015-07-13", "2015-07-14", "2015-07-15", "2015-07-16"};
+ stmt.bind(1, std::move(days));
+
+ auto f = cass_session_execute(current_cassandra_session->session, stmt.statement);
+ handle_future(f, [&](auto future) {
+ header_map headers;
+ headers.emplace("content-type", application_json);
+ res.write_head(200, headers);
+
+ const CassResult *result = cass_future_get_result(future);
+ size_t count = cass_result_row_count(result);
+ cout << "row count: " << count << endl;
+ CassIterator *rows = cass_iterator_from_result(result);
+
+ stringstream buf;
+ buf << "[";
+
+ bool first = true;
+ while (cass_iterator_next(rows)) {
+ const CassRow *row = cass_iterator_get_row(rows);
+
+ string d = read_string(row, 0);
+ auto timestamp = read_value_int64(row, 1);
+// auto sensors = read_string(row, 1);
+ int value = -1;
+
+ if (!first) {
+ buf << ",";
+ } else {
+ first = false;
+ }
+ buf << endl << " {device: '" << d << "', timestamp: '" << timestamp << "'}";
+ }
+
+ buf << endl << "]" << endl;
+
+ cass_result_free(result);
+ cass_iterator_free(rows);
+
+ res.end(buf.str() + "\r\n");
+ }, [&](auto future, auto err) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(500, headers);
+
+ stringstream buf;
+ buf << "Bad shit: " << error_message(future) << "\r\n";
+ res.end(buf.str());
+ });
+}
+
+using namespace __cxxabiv1;
+
+std::string util_demangle(std::string to_demangle) {
+ int status = 0;
+ char * buff = __cxxabiv1::__cxa_demangle(to_demangle.c_str(), NULL, NULL, &status);
+ std::string demangled = buff;
+ std::free(buff);
+ return demangled;
+}
+
+void internal_server_error(const response &res, const string &msg) {
+ header_map headers;
+ headers.emplace("content-type", text_plain);
+ res.write_head(500, headers);
+
+ stringstream buf;
+ buf << "Internal server error: " << msg << "\r\n";
+ auto s = buf.str();
+
+ cout << s << endl;
+ res.end(s);
+}
+
+int main(int argc, const char *const argv[]) {
+ string cassandra_cluster;
+ po::options_description all("Options");
+ all.add_options()("cassandra-cluster", po::value<string>(&cassandra_cluster)->default_value("127.0.0.1"));
+
+ po::variables_map vm;
+ try {
+ auto parsed = po::parse_command_line(argc, argv, all);
+ po::store(parsed, vm);
+ po::notify(vm);
+ auto unrecognized = po::collect_unrecognized(parsed.options, po::include_positional);
+
+ if (vm.count("help")) {
+ cerr << all << "\n";
+ return EXIT_FAILURE;
+ }
+
+ if (unrecognized.size()) {
+ cerr << "Unrecognized option: " << unrecognized.at(0) << "\n";
+ return EXIT_FAILURE;
+ }
+
+ } catch (po::required_option &e) {
+ cerr << "Missing required option: " << e.get_option_name() << endl;
+ cerr << all << endl;
+ } catch (po::unknown_option &e) {
+ cerr << e.what() << endl;
+ return EXIT_FAILURE;
+ }
+
+ CassFuture *connect_future = nullptr;
+ CassCluster *cluster = cass_cluster_new();
+ auto session = make_unique<cassandra_session>();
+
+ cass_cluster_set_contact_points(cluster, cassandra_cluster.c_str());
+
+ connect_future = cass_session_connect(session->session, cluster);
+
+ if (cass_future_error_code(connect_future) != CASS_OK) {
+ string s = error_message(connect_future);
+ cerr << "Could not connect to Cassandra:" << s << endl;
+ return EXIT_FAILURE;
+ }
+
+ cout << "Connected to Cassandra" << endl;
+ current_cassandra_session = std::move(session);
+
+ execute_query(current_cassandra_session->session, "USE " + keyspace_name);
+ boost::system::error_code ec;
+ http2 server;
+ server.num_threads(4);
+
+ server.handle("/", [](const request &req, const response &res) {
+ cerr << req.method() << " " << req.uri().path << endl;
+
+ vector<string> paths;
+ auto &path = req.uri().path;
+ boost::algorithm::split(paths, path, boost::algorithm::is_any_of("/"), boost::algorithm::token_compress_on);
+
+ if (paths.begin()->size() == 0) {
+ paths.erase(paths.begin());
+ }
+
+ try {
+ for (auto &p : paths) {
+ cout << "path ->" << p << "<-" << endl;
+ }
+
+ param device("device");
+ if (matches(paths, "device", device)) {
+ if (req.method() == "GET") {
+ handle_device_get(req, res, device.value);
+ } else {
+ method_not_allowed(req, res);
+ }
+ } else {
+ res.write_head(404);
+ res.end("Not found :(\r\n");
+ }
+ } catch (const exception& ex) {
+ internal_server_error(res, ex.what());
+ } catch (const string& ex) {
+ internal_server_error(res, ex);
+ } catch (...) {
+ auto type = util_demangle(__cxa_current_exception_type()->name());
+ internal_server_error(res, "Unknown exception, type: " + type);
+ }
+ });
+
+ std::cerr << "Starting server" << endl;
+ if (server.listen_and_serve(ec, "127.0.0.1", "3000")) {
+ std::cerr << "error: " << ec.message() << std::endl;
+ }
+ std::cerr << "woot?" << endl;
+
+ return EXIT_SUCCESS;
+}