aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.cpp')
-rw-r--r--apps/SoilMoistureIo.cpp456
1 files changed, 0 insertions, 456 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
deleted file mode 100644
index 1d9281b..0000000
--- a/apps/SoilMoistureIo.cpp
+++ /dev/null
@@ -1,456 +0,0 @@
-#include "SoilMoistureIo.h"
-
-#include "json.hpp"
-#include <set>
-#include <boost/regex.hpp>
-#include <chrono>
-
-namespace trygvis {
-namespace soil_moisture {
-
-using namespace std;
-using json = nlohmann::json;
-
-void VectorSampleOutputStream::write(SampleRecord const &sample) {
- if (sample.empty()) {
- return;
- }
-
- samples.emplace_back(sample);
-}
-
-CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
- : stream(move(stream)), headerWritten(false), dict(dict) {
-}
-
-void CsvSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
- if (sample.empty()) {
- return;
- }
-
- // Build the dict with the keys from the first sample.
- if (dict.empty()) {
- SampleKeyIndex index = 0;
- auto ptr = sample.cbegin();
- while (ptr != sample.cend()) {
- auto o = *ptr;
-
- if (o) {
- auto name = sample.dict.at(index)->name;
- dict.indexOf(name);
- }
-
- ptr++;
- index++;
- }
- }
-
- if (!headerWritten) {
- writeHeader();
- headerWritten = true;
- }
-
- auto &s = *stream.get();
-
- auto it = dict.begin();
- while (it != dict.end()) {
- if (it != dict.begin()) {
- s << ",";
- }
-
- auto key = *it++;
- auto sampleKey = sample.dict.indexOf(key->name);
- auto o = sample.at(sampleKey);
-
- if (o) {
- s << o.get();
- }
- }
-
- s << endl;
-}
-
-void CsvSampleOutputStream::writeHeader() {
- auto &s = *stream.get();
-
- auto i = dict.begin();
- while (i != dict.end()) {
- s << (*i)->name;
-
- i++;
-
- if (i != dict.end()) {
- s << ",";
- }
- }
-
- s << endl;
-}
-
-JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
- dict(dict), stream(move(stream)) {
-}
-
-void JsonSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
- if (sample.empty()) {
- return;
- }
-
- json doc({});
-
- if (!dict.empty()) {
- for (auto &key: dict) {
- auto sampleKey = sample.dict.indexOf(key->name);
-
- auto value = sample.at(sampleKey);
-
- if (value) {
- doc[key->name] = value.get();
- }
- }
- } else {
- for (auto &sampleKey: sample.dict) {
- auto o = sample.at(sampleKey);
-
- if (o) {
- // Make sure that the key is registered in the dictionary
- dict.indexOf(sampleKey->name);
- doc[sampleKey->name] = o.get();
- }
- }
- }
-
- *stream.get() << doc << endl;
-}
-
-KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
- dict(dict), stream(move(stream)) {
-}
-
-void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
- if (sample.empty()) {
- return;
- }
-
- auto &s = *stream.get();
-
- bool first = true;
- if (!dict.empty()) {
- for (auto &key: dict) {
- auto sampleKey = sample.dict.indexOf(key->name);
-
- auto value = sample.at(sampleKey);
-
- if (value) {
- if (first) {
- first = false;
- } else {
- s << ", ";
- }
- s << key->name << "=" << value.get();
- }
- }
- } else {
- for (auto &sampleKey: sample.dict) {
- auto o = sample.at(sampleKey);
-
- if (o) {
- if (first) {
- first = false;
- } else {
- s << ", ";
- }
- // Make sure that the key is registered in the dictionary
- dict.indexOf(sampleKey->name);
- s << sampleKey->name << "=" << o.get();
- }
- }
- }
-
- *stream.get() << endl;
-}
-
-RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) :
- stream(move(stream)), timestamp_key(timestamp_key) {
- if (output_fields) {
- for (auto field : output_fields.get()->fields) {
- keys.emplace_back(dict.indexOf(field));
- }
- } else {
- for (auto key : dict) {
- keys.emplace_back(key);
- }
- }
-}
-
-void RrdSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
- if (sample.empty()) {
- return;
- }
-
- auto &s = *stream.get();
-
- auto timestampO = sample.at(timestamp_key);
-
- if (!timestampO) {
- return;
- }
-
- auto timestamp = timestampO.get();
-
- s << timestamp;
-
- bool first = true;
- for (auto &key: keys) {
- if (key == timestamp_key) {
- continue;
- }
-
- auto value = sample.at(key);
-
- if (first) {
- s << "@";
- first = false;
- } else {
- s << ":";
- }
-
- s << (value ? value.get() : "U");
- }
-
- *stream.get() << endl;
-}
-
-SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
- dict(dict), stream(move(stream)), table_name(table_name) {
-}
-
-void SqlSampleOutputStream::write(SampleRecord const &values) {
- throw sample_exception("deimplemented");
-
-// string fs, vs;
-//
-// fs.reserve(1024);
-// vs.reserve(1024);
-//
-// if (filter_fields) {
-// auto i = fields.begin();
-//
-// while (i != fields.end()) {
-// auto field = *i;
-//
-// fs += field;
-//
-// auto value = values.find(field);
-//
-// if (value != values.end()) {
-// vs += "'" + value->second + "'";
-// } else {
-// vs += "NULL";
-// }
-//
-// i++;
-//
-// if (i != fields.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// } else {
-// auto i = values.begin();
-// while (i != values.end()) {
-// auto v = *i++;
-//
-// fs += v.first;
-// vs += "'" + v.second + "'";
-//
-// if (i != values.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// }
-//
-// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
-}
-
-void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
-
- size_t size = buffer_size(buffer);
-
- if (size == 0 && line->size()) {
- process_line(line);
- line = make_shared<vector<uint8_t>>();
- return;
- }
-
- auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
-
- for (int i = 0; i < size; i++) {
- uint8_t b = data[i];
-
- if (b == packet_delimiter) {
- process_line(line);
- line = make_shared<vector<uint8_t>>();
- } else {
- line->push_back(b);
- }
- }
-
-}
-
-void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
- auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
- auto s = std::string((char *) packet->data(), packet->size());
-
- static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)");
-
- auto start = s.cbegin();
- auto end = s.cend();
- boost::match_results<std::string::const_iterator> what;
- boost::match_flag_type flags = boost::match_default;
-
- SampleRecord sample(dict);
-
- while (regex_search(start, end, what, e, flags)) {
- auto name = static_cast<string>(what[1]);
- auto value = static_cast<string>(what[2]);
- start = what[0].second;
-
- auto key = dict.indexOf(name);
- sample.set(key, value);
-
- flags |= boost::match_prev_avail;
- flags |= boost::match_not_bob;
- }
-
- output->write(sample);
-}
-
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
- // Directly select the parser now until we have more than one parser
- parser = std::move(keyValueParser);
- type_ = sample_format_type::KEY_VALUE;
-}
-
-void AutoSampleParser::process(mutable_buffers_1 buffer) {
- if (parser) {
- parser->process(buffer);
- } else {
- throw runtime_error("Not implemented yet");
- }
-}
-
-string to_string(const sample_format_type &arg) {
- if (arg == sample_format_type::AUTO)
- return "auto";
- else if (arg == sample_format_type::CSV)
- return "csv";
- else if (arg == sample_format_type::JSON)
- return "json";
- else if (arg == sample_format_type::KEY_VALUE)
- return "key-value";
- else if (arg == sample_format_type::SQL)
- return "sql";
- else if (arg == sample_format_type::RRD)
- return "rrd";
- else
- return "unknown";
-}
-
-std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
- return os << to_string(type);
-}
-
-std::istream& operator>>(std::istream& is, sample_format_type& type) {
- string s;
- is >> s;
-
- if (s == "auto") {
- type = sample_format_type::AUTO;
- } else if (s == "csv") {
- type = sample_format_type::CSV;
- } else if (s == "key-value") {
- type = sample_format_type::KEY_VALUE;
- } else if (s == "json") {
- type = sample_format_type::JSON;
- } else if (s == "sql") {
- type = sample_format_type::SQL;
- } else if (s == "rrd") {
- type = sample_format_type::RRD;
- }
-
- return is;
-}
-
-unique_ptr<SampleStreamParser> open_sample_input_stream(
- shared_ptr<SampleOutputStream> output,
- KeyDictionary &dict,
- sample_format_type type) {
- if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleStreamParser>(output, dict);
- } else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(output, dict);
- } else {
- throw sample_exception("No parser for format type: " + to_string(type));
- }
-}
-
-template<typename T>
-o<T *> find_option(vector<sample_output_stream_option *> &options) {
- for (sample_output_stream_option *& option : options) {
- T *x = dynamic_cast<T *>(option);
-
- if (x != nullptr) {
- return o<T *>(x);
- }
- }
-
- return o<T *>();
-}
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type,
- vector<sample_output_stream_option *> options) {
-
- if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::RRD) {
- o<output_fields *> of = find_option<output_fields>(options);
-
- o<timestamp_field *> tsf = find_option<timestamp_field>(options);
-
- auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
-
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
-// } else if (type == sample_format_type::SQL) {
-// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
- } else {
- throw sample_exception("No writer for format type: " + to_string(type));
- }
-}
-
-//template<typename T>
-ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) {
-}
-
-//template<typename T>
-void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
- std::unique_lock<std::mutex> lock(mutex);
-
- underlying->write(sample);
-}
-
-}
-}