aboutsummaryrefslogtreecommitdiff
path: root/sensor
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-03-22 16:57:09 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2015-03-22 16:57:09 +0100
commitaad1eaa291460509a5cf94092609ae22ebef5494 (patch)
tree72d6308a84449e85b00de84f27f0a613b46ef669 /sensor
parent0dc2cc6503386c809266ad6564ba675803cf8cc7 (diff)
downloadble-toys-aad1eaa291460509a5cf94092609ae22ebef5494.tar.gz
ble-toys-aad1eaa291460509a5cf94092609ae22ebef5494.tar.bz2
ble-toys-aad1eaa291460509a5cf94092609ae22ebef5494.tar.xz
ble-toys-aad1eaa291460509a5cf94092609ae22ebef5494.zip
o Renaming SoilMoistureIo to SensorSample, moving to its own library.
Diffstat (limited to 'sensor')
-rw-r--r--sensor/CMakeLists.txt8
-rw-r--r--sensor/include/SensorSample.h427
-rw-r--r--sensor/main/SensorSample.cpp456
-rw-r--r--sensor/test/SoilMoistureIoTest.cpp48
4 files changed, 939 insertions, 0 deletions
diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt
new file mode 100644
index 0000000..5d7612f
--- /dev/null
+++ b/sensor/CMakeLists.txt
@@ -0,0 +1,8 @@
+add_library(trygvis-sensor
+ main/SensorSample.cpp)
+
+include_directories("${PROJECT_SOURCE_DIR}/json/src")
+include_directories(include)
+
+# Boost
+find_package(Boost COMPONENTS regex system program_options REQUIRED)
diff --git a/sensor/include/SensorSample.h b/sensor/include/SensorSample.h
new file mode 100644
index 0000000..386296a
--- /dev/null
+++ b/sensor/include/SensorSample.h
@@ -0,0 +1,427 @@
+#ifndef SOIL_MOISTURE_IO_H
+#define SOIL_MOISTURE_IO_H
+
+#include <ostream>
+#include <vector>
+#include <map>
+#include <map>
+#include <memory>
+#include <boost/asio/buffer.hpp>
+#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
+#include <functional>
+#include <mutex>
+
+// TODO: rename to trygvis::sample
+namespace trygvis {
+namespace soil_moisture {
+
+using namespace std;
+using namespace boost::asio;
+
+template<typename A>
+using o = boost::optional<A>;
+
+enum class sample_format_type {
+ AUTO,
+ CSV,
+ KEY_VALUE,
+ JSON,
+ SQL,
+ RRD,
+};
+
+string to_string(const sample_format_type &arg);
+
+std::ostream& operator<<(std::ostream& os, sample_format_type const& type);
+
+std::istream& operator>>(std::istream& is, sample_format_type& type);
+
+class SampleStreamParser;
+
+class SampleOutputStream;
+
+class KeyDictionary;
+
+class SampleKey;
+
+// TODO: rename to open_sample_stream_parser
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type = sample_format_type::AUTO);
+
+class sample_output_stream_option {
+public:
+ virtual ~sample_output_stream_option() {
+ };
+};
+
+class output_fields : public sample_output_stream_option {
+public:
+// output_fields() {
+// }
+//
+// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) :
+// fields(begin, end) {
+// }
+
+ ~output_fields() {
+ }
+
+ vector<string> fields;
+};
+
+
+class timestamp_field : public sample_output_stream_option {
+public:
+ timestamp_field(string name) : name(name) {
+ }
+
+ ~timestamp_field() {
+ }
+
+ string name;
+};
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ vector<sample_output_stream_option *> options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ return open_sample_output_stream(output, dict, type);
+}
+
+class ThreadSafeSampleOutputStream;
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
+class sample_exception : public runtime_error {
+public:
+ sample_exception(const string &what) : runtime_error(what) {
+ }
+};
+
+class KeyDictionary;
+
+using SampleKeyVector = vector<SampleKey *>;
+using SampleKeyIndex = SampleKeyVector::size_type;
+
+struct SampleKey {
+private:
+ SampleKey(const SampleKey& that) = delete;
+ SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) {
+ if (name.length() == 0) {
+ throw sample_exception("Bad sample key.");
+ }
+ }
+
+public:
+ friend class KeyDictionary;
+
+ inline
+ bool operator==(const SampleKey &that) const {
+ return name == that.name;
+ }
+
+ const SampleKeyIndex index;
+ const string name;
+};
+
+class KeyDictionary {
+public:
+ KeyDictionary() {
+ }
+
+ ~KeyDictionary() {
+ std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>());
+ }
+ KeyDictionary(KeyDictionary& that) = delete;
+
+ SampleKey *indexOf(const string key) {
+ SampleKeyIndex i = 0;
+ for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) {
+ if ((*ptr)->name == key) {
+ return *ptr;
+ }
+ }
+
+ i = keys.size();
+ auto sample_key = new SampleKey(i, key);
+ keys.push_back(sample_key);
+
+ return sample_key;
+ }
+
+ SampleKey *at(SampleKeyIndex i) const {
+ if (i >= keys.size()) {
+ throw sample_exception("Out of bounds");
+ }
+
+ return keys.at(i);
+ }
+
+ vector<SampleKey *> findIndexes(SampleKeyVector &keys) {
+ vector<SampleKey *> indexes;
+
+ for (auto &key: keys) {
+ auto index = indexOf(key->name);
+ indexes.push_back(index);
+ }
+
+ return indexes;
+ }
+
+ inline
+ SampleKeyVector::const_iterator end() const {
+ return keys.cend();
+ }
+
+ inline
+ SampleKeyVector::const_iterator begin() const {
+ return keys.cbegin();
+ }
+
+// string nameOf(SampleKeyIndex index) {
+// return keys.at(index).name;
+// }
+
+ inline
+ SampleKeyVector::size_type size() const {
+ return keys.size();
+ }
+
+ inline
+ bool empty() const {
+ return keys.empty();
+ }
+
+private:
+ SampleKeyVector keys;
+};
+
+class SampleRecord {
+public:
+ typedef vector<o<string>> vec;
+
+ SampleRecord(KeyDictionary &dict) : dict(dict) {
+ }
+
+ SampleRecord(KeyDictionary &dict, vec values)
+ : dict(dict), values(values) {
+ }
+
+ inline
+ vec::const_iterator cbegin() const {
+ return values.cbegin();
+ }
+
+ inline
+ vec::const_iterator cend() const {
+ return values.cend();
+ }
+
+ inline
+ bool empty() const {
+ return values.empty();
+ }
+
+ const o<string> at(const SampleKey *key) const {
+ SampleKeyIndex index = key->index;
+ if (index >= values.size()) {
+ return o<string>();
+ }
+
+ return values.at(index);
+ }
+
+ void set(const SampleKey *key, const std::string &value) {
+ values.resize(max(values.size(), key->index + 1));
+
+ values.at(key->index).reset(value);
+ }
+
+ template<class A>
+ const o<A> lexical_at(const SampleKey *key) const {
+ auto value = at(key);
+
+ if (!value) {
+ return o<A>();
+ }
+
+ return o<A>(boost::lexical_cast<A>(value.get()));
+ }
+
+ string to_string() const {
+ SampleKeyIndex i = 0;
+ string s;
+ for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
+ auto o = *ptr;
+
+ if (!o) {
+ continue;
+ }
+
+ auto value = o.get();
+
+ s += dict.at(i)->name + " = " + value + ", ";
+ }
+ return s;
+ }
+
+ KeyDictionary &dict;
+private:
+ vec values;
+};
+
+class SampleOutputStream {
+public:
+ virtual void write(SampleRecord const &sample) = 0;
+};
+
+class VectorSampleOutputStream : public SampleOutputStream {
+
+public:
+ virtual void write(SampleRecord const &sample) override;
+
+public:
+ vector<SampleRecord> samples;
+};
+
+class ThreadSafeSampleOutputStream : public SampleOutputStream {
+public:
+ ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+
+ ~ThreadSafeSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying;
+ std::mutex mutex;
+};
+
+class CsvSampleOutputStream : public SampleOutputStream {
+public:
+ CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample);
+
+ const KeyDictionary &getDict() {
+ return dict;
+ }
+
+private:
+ void writeHeader();
+
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ bool headerWritten;
+};
+
+class JsonSampleOutputStream : public SampleOutputStream {
+public:
+ JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class KeyValueSampleOutputStream : public SampleOutputStream {
+public:
+ KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class RrdSampleOutputStream : public SampleOutputStream {
+public:
+ RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ vector<SampleKey *> keys;
+ shared_ptr<ostream> stream;
+ const SampleKey *timestamp_key;
+};
+
+class SqlSampleOutputStream : public SampleOutputStream {
+public:
+ SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ const string table_name;
+};
+
+class SampleStreamParser {
+public:
+ // TODO: return number of samples found for progress indication?
+ virtual void process(mutable_buffers_1 buffer) = 0;
+
+ virtual sample_format_type type() {
+ return type_;
+ }
+
+protected:
+ sample_format_type type_;
+
+ SampleStreamParser(const sample_format_type type) : type_(type) {
+ }
+};
+
+class KeyValueSampleStreamParser : public SampleStreamParser {
+
+public:
+ KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
+ line(make_shared<vector<uint8_t>>()) {
+ }
+
+ void process(mutable_buffers_1 buffer) override;
+
+private:
+ void process_line(shared_ptr<vector<uint8_t>> packet);
+
+ static const uint8_t packet_delimiter = '\n';
+ KeyDictionary &dict;
+ shared_ptr<SampleOutputStream> output;
+ shared_ptr<vector<uint8_t>> line;
+};
+
+class AutoSampleParser : public SampleStreamParser {
+public:
+ AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+
+private:
+ unique_ptr<SampleStreamParser> parser;
+ unique_ptr<KeyValueSampleStreamParser> keyValueParser;
+public:
+ virtual void process(mutable_buffers_1 buffer);
+};
+
+}
+}
+
+#endif
diff --git a/sensor/main/SensorSample.cpp b/sensor/main/SensorSample.cpp
new file mode 100644
index 0000000..6ec6dfc
--- /dev/null
+++ b/sensor/main/SensorSample.cpp
@@ -0,0 +1,456 @@
+#include "SensorSample.h"
+
+#include "json.hpp"
+#include <set>
+#include <boost/regex.hpp>
+#include <chrono>
+
+namespace trygvis {
+namespace soil_moisture {
+
+using namespace std;
+using json = nlohmann::json;
+
+void VectorSampleOutputStream::write(SampleRecord const &sample) {
+ if (sample.empty()) {
+ return;
+ }
+
+ samples.emplace_back(sample);
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
+ : stream(move(stream)), headerWritten(false), dict(dict) {
+}
+
+void CsvSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ // Build the dict with the keys from the first sample.
+ if (dict.empty()) {
+ SampleKeyIndex index = 0;
+ auto ptr = sample.cbegin();
+ while (ptr != sample.cend()) {
+ auto o = *ptr;
+
+ if (o) {
+ auto name = sample.dict.at(index)->name;
+ dict.indexOf(name);
+ }
+
+ ptr++;
+ index++;
+ }
+ }
+
+ if (!headerWritten) {
+ writeHeader();
+ headerWritten = true;
+ }
+
+ auto &s = *stream.get();
+
+ auto it = dict.begin();
+ while (it != dict.end()) {
+ if (it != dict.begin()) {
+ s << ",";
+ }
+
+ auto key = *it++;
+ auto sampleKey = sample.dict.indexOf(key->name);
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ s << o.get();
+ }
+ }
+
+ s << endl;
+}
+
+void CsvSampleOutputStream::writeHeader() {
+ auto &s = *stream.get();
+
+ auto i = dict.begin();
+ while (i != dict.end()) {
+ s << (*i)->name;
+
+ i++;
+
+ if (i != dict.end()) {
+ s << ",";
+ }
+ }
+
+ s << endl;
+}
+
+JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+ dict(dict), stream(move(stream)) {
+}
+
+void JsonSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ json doc({});
+
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sampleKey = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sampleKey);
+
+ if (value) {
+ doc[key->name] = value.get();
+ }
+ }
+ } else {
+ for (auto &sampleKey: sample.dict) {
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ // Make sure that the key is registered in the dictionary
+ dict.indexOf(sampleKey->name);
+ doc[sampleKey->name] = o.get();
+ }
+ }
+ }
+
+ *stream.get() << doc << endl;
+}
+
+KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+ dict(dict), stream(move(stream)) {
+}
+
+void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ bool first = true;
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sampleKey = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sampleKey);
+
+ if (value) {
+ if (first) {
+ first = false;
+ } else {
+ s << ", ";
+ }
+ s << key->name << "=" << value.get();
+ }
+ }
+ } else {
+ for (auto &sampleKey: sample.dict) {
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ if (first) {
+ first = false;
+ } else {
+ s << ", ";
+ }
+ // Make sure that the key is registered in the dictionary
+ dict.indexOf(sampleKey->name);
+ s << sampleKey->name << "=" << o.get();
+ }
+ }
+ }
+
+ *stream.get() << endl;
+}
+
+RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) :
+ stream(move(stream)), timestamp_key(timestamp_key) {
+ if (output_fields) {
+ for (auto field : output_fields.get()->fields) {
+ keys.emplace_back(dict.indexOf(field));
+ }
+ } else {
+ for (auto key : dict) {
+ keys.emplace_back(key);
+ }
+ }
+}
+
+void RrdSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ auto timestampO = sample.at(timestamp_key);
+
+ if (!timestampO) {
+ return;
+ }
+
+ auto timestamp = timestampO.get();
+
+ s << timestamp;
+
+ bool first = true;
+ for (auto &key: keys) {
+ if (key == timestamp_key) {
+ continue;
+ }
+
+ auto value = sample.at(key);
+
+ if (first) {
+ s << "@";
+ first = false;
+ } else {
+ s << ":";
+ }
+
+ s << (value ? value.get() : "U");
+ }
+
+ *stream.get() << endl;
+}
+
+SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+ dict(dict), stream(move(stream)), table_name(table_name) {
+}
+
+void SqlSampleOutputStream::write(SampleRecord const &values) {
+ throw sample_exception("deimplemented");
+
+// string fs, vs;
+//
+// fs.reserve(1024);
+// vs.reserve(1024);
+//
+// if (filter_fields) {
+// auto i = fields.begin();
+//
+// while (i != fields.end()) {
+// auto field = *i;
+//
+// fs += field;
+//
+// auto value = values.find(field);
+//
+// if (value != values.end()) {
+// vs += "'" + value->second + "'";
+// } else {
+// vs += "NULL";
+// }
+//
+// i++;
+//
+// if (i != fields.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// } else {
+// auto i = values.begin();
+// while (i != values.end()) {
+// auto v = *i++;
+//
+// fs += v.first;
+// vs += "'" + v.second + "'";
+//
+// if (i != values.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// }
+//
+// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+}
+
+void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
+
+ size_t size = buffer_size(buffer);
+
+ if (size == 0 && line->size()) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ return;
+ }
+
+ auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
+
+ for (int i = 0; i < size; i++) {
+ uint8_t b = data[i];
+
+ if (b == packet_delimiter) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ } else {
+ line->push_back(b);
+ }
+ }
+
+}
+
+void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+ auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
+ auto s = std::string((char *) packet->data(), packet->size());
+
+ static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)");
+
+ auto start = s.cbegin();
+ auto end = s.cend();
+ boost::match_results<std::string::const_iterator> what;
+ boost::match_flag_type flags = boost::match_default;
+
+ SampleRecord sample(dict);
+
+ while (regex_search(start, end, what, e, flags)) {
+ auto name = static_cast<string>(what[1]);
+ auto value = static_cast<string>(what[2]);
+ start = what[0].second;
+
+ auto key = dict.indexOf(name);
+ sample.set(key, value);
+
+ flags |= boost::match_prev_avail;
+ flags |= boost::match_not_bob;
+ }
+
+ output->write(sample);
+}
+
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
+ // Directly select the parser now until we have more than one parser
+ parser = std::move(keyValueParser);
+ type_ = sample_format_type::KEY_VALUE;
+}
+
+void AutoSampleParser::process(mutable_buffers_1 buffer) {
+ if (parser) {
+ parser->process(buffer);
+ } else {
+ throw runtime_error("Not implemented yet");
+ }
+}
+
+string to_string(const sample_format_type &arg) {
+ if (arg == sample_format_type::AUTO)
+ return "auto";
+ else if (arg == sample_format_type::CSV)
+ return "csv";
+ else if (arg == sample_format_type::JSON)
+ return "json";
+ else if (arg == sample_format_type::KEY_VALUE)
+ return "key-value";
+ else if (arg == sample_format_type::SQL)
+ return "sql";
+ else if (arg == sample_format_type::RRD)
+ return "rrd";
+ else
+ return "unknown";
+}
+
+std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
+ return os << to_string(type);
+}
+
+std::istream& operator>>(std::istream& is, sample_format_type& type) {
+ string s;
+ is >> s;
+
+ if (s == "auto") {
+ type = sample_format_type::AUTO;
+ } else if (s == "csv") {
+ type = sample_format_type::CSV;
+ } else if (s == "key-value") {
+ type = sample_format_type::KEY_VALUE;
+ } else if (s == "json") {
+ type = sample_format_type::JSON;
+ } else if (s == "sql") {
+ type = sample_format_type::SQL;
+ } else if (s == "rrd") {
+ type = sample_format_type::RRD;
+ }
+
+ return is;
+}
+
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleStreamParser>(output, dict);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output, dict);
+ } else {
+ throw sample_exception("No parser for format type: " + to_string(type));
+ }
+}
+
+template<typename T>
+o<T *> find_option(vector<sample_output_stream_option *> &options) {
+ for (sample_output_stream_option *& option : options) {
+ T *x = dynamic_cast<T *>(option);
+
+ if (x != nullptr) {
+ return o<T *>(x);
+ }
+ }
+
+ return o<T *>();
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ vector<sample_output_stream_option *> options) {
+
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::JSON) {
+ return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ o<output_fields *> of = find_option<output_fields>(options);
+
+ o<timestamp_field *> tsf = find_option<timestamp_field>(options);
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+// } else if (type == sample_format_type::SQL) {
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
+ } else {
+ throw sample_exception("No writer for format type: " + to_string(type));
+ }
+}
+
+//template<typename T>
+ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) {
+}
+
+//template<typename T>
+void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ underlying->write(sample);
+}
+
+}
+}
diff --git a/sensor/test/SoilMoistureIoTest.cpp b/sensor/test/SoilMoistureIoTest.cpp
new file mode 100644
index 0000000..574885c
--- /dev/null
+++ b/sensor/test/SoilMoistureIoTest.cpp
@@ -0,0 +1,48 @@
+#include "SensorSample.h"
+
+#define BOOST_TEST_MODULE "SoilMoistureIoTest"
+
+#include <boost/test/unit_test.hpp>
+
+using namespace trygvis::soil_moisture;
+
+BOOST_AUTO_TEST_CASE(key_value_parser) {
+ KeyDictionary dict;
+
+ auto buffer = make_shared<VectorSampleOutputStream>();
+
+ auto parser = new KeyValueSampleStreamParser(buffer, dict);
+
+ char data[] = "a=1, b=2, c=3\n";
+ parser->process(boost::asio::buffer(data, sizeof(data)));
+ BOOST_CHECK_EQUAL(buffer->samples.size(), 1);
+ BOOST_CHECK_EQUAL(dict.size(), 3);
+ auto it = dict.begin();
+ BOOST_CHECK_EQUAL((*it)->name, "a");
+ BOOST_CHECK_EQUAL((*it++)->index, 0);
+ BOOST_CHECK_EQUAL((*it)->name, "b");
+ BOOST_CHECK_EQUAL((*it++)->index, 1);
+ BOOST_CHECK_EQUAL((*it)->name, "c");
+ BOOST_CHECK_EQUAL((*it++)->index, 2);
+}
+
+BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) {
+ KeyDictionary dict;
+
+ dict.indexOf("c");
+ dict.indexOf("b");
+
+ auto buffer = make_shared<VectorSampleOutputStream>();
+
+ auto parser = new KeyValueSampleStreamParser(buffer, dict);
+
+ char data[] = "a=1, b=2, c=3\n";
+ parser->process(boost::asio::buffer(data, sizeof(data)));
+ BOOST_CHECK_EQUAL(buffer->samples.size(), 1);
+ BOOST_CHECK_EQUAL(dict.size(), 3);
+ auto it = dict.begin();
+ BOOST_CHECK_EQUAL((*it)->name, "c");
+ BOOST_CHECK_EQUAL((*it++)->index, 0);
+ BOOST_CHECK_EQUAL((*it)->name, "b");
+ BOOST_CHECK_EQUAL((*it++)->index, 1);
+}