diff options
Diffstat (limited to 'sensor/main')
-rw-r--r-- | sensor/main/SensorSample.cpp | 456 |
1 files changed, 456 insertions, 0 deletions
diff --git a/sensor/main/SensorSample.cpp b/sensor/main/SensorSample.cpp new file mode 100644 index 0000000..6ec6dfc --- /dev/null +++ b/sensor/main/SensorSample.cpp @@ -0,0 +1,456 @@ +#include "SensorSample.h" + +#include "json.hpp" +#include <set> +#include <boost/regex.hpp> +#include <chrono> + +namespace trygvis { +namespace soil_moisture { + +using namespace std; +using json = nlohmann::json; + +void VectorSampleOutputStream::write(SampleRecord const &sample) { + if (sample.empty()) { + return; + } + + samples.emplace_back(sample); +} + +CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) + : stream(move(stream)), headerWritten(false), dict(dict) { +} + +void CsvSampleOutputStream::write(SampleRecord const &sample) { + // Skip empty records + if (sample.empty()) { + return; + } + + // Build the dict with the keys from the first sample. + if (dict.empty()) { + SampleKeyIndex index = 0; + auto ptr = sample.cbegin(); + while (ptr != sample.cend()) { + auto o = *ptr; + + if (o) { + auto name = sample.dict.at(index)->name; + dict.indexOf(name); + } + + ptr++; + index++; + } + } + + if (!headerWritten) { + writeHeader(); + headerWritten = true; + } + + auto &s = *stream.get(); + + auto it = dict.begin(); + while (it != dict.end()) { + if (it != dict.begin()) { + s << ","; + } + + auto key = *it++; + auto sampleKey = sample.dict.indexOf(key->name); + auto o = sample.at(sampleKey); + + if (o) { + s << o.get(); + } + } + + s << endl; +} + +void CsvSampleOutputStream::writeHeader() { + auto &s = *stream.get(); + + auto i = dict.begin(); + while (i != dict.end()) { + s << (*i)->name; + + i++; + + if (i != dict.end()) { + s << ","; + } + } + + s << endl; +} + +JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : + dict(dict), stream(move(stream)) { +} + +void JsonSampleOutputStream::write(SampleRecord const &sample) { + // Skip empty records + if (sample.empty()) { + return; + } + + json doc({}); + + if (!dict.empty()) { + for (auto &key: dict) { + auto sampleKey = sample.dict.indexOf(key->name); + + auto value = sample.at(sampleKey); + + if (value) { + doc[key->name] = value.get(); + } + } + } else { + for (auto &sampleKey: sample.dict) { + auto o = sample.at(sampleKey); + + if (o) { + // Make sure that the key is registered in the dictionary + dict.indexOf(sampleKey->name); + doc[sampleKey->name] = o.get(); + } + } + } + + *stream.get() << doc << endl; +} + +KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : + dict(dict), stream(move(stream)) { +} + +void KeyValueSampleOutputStream::write(SampleRecord const &sample) { + // Skip empty records + if (sample.empty()) { + return; + } + + auto &s = *stream.get(); + + bool first = true; + if (!dict.empty()) { + for (auto &key: dict) { + auto sampleKey = sample.dict.indexOf(key->name); + + auto value = sample.at(sampleKey); + + if (value) { + if (first) { + first = false; + } else { + s << ", "; + } + s << key->name << "=" << value.get(); + } + } + } else { + for (auto &sampleKey: sample.dict) { + auto o = sample.at(sampleKey); + + if (o) { + if (first) { + first = false; + } else { + s << ", "; + } + // Make sure that the key is registered in the dictionary + dict.indexOf(sampleKey->name); + s << sampleKey->name << "=" << o.get(); + } + } + } + + *stream.get() << endl; +} + +RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) : + stream(move(stream)), timestamp_key(timestamp_key) { + if (output_fields) { + for (auto field : output_fields.get()->fields) { + keys.emplace_back(dict.indexOf(field)); + } + } else { + for (auto key : dict) { + keys.emplace_back(key); + } + } +} + +void RrdSampleOutputStream::write(SampleRecord const &sample) { + // Skip empty records + if (sample.empty()) { + return; + } + + auto &s = *stream.get(); + + auto timestampO = sample.at(timestamp_key); + + if (!timestampO) { + return; + } + + auto timestamp = timestampO.get(); + + s << timestamp; + + bool first = true; + for (auto &key: keys) { + if (key == timestamp_key) { + continue; + } + + auto value = sample.at(key); + + if (first) { + s << "@"; + first = false; + } else { + s << ":"; + } + + s << (value ? value.get() : "U"); + } + + *stream.get() << endl; +} + +SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : + dict(dict), stream(move(stream)), table_name(table_name) { +} + +void SqlSampleOutputStream::write(SampleRecord const &values) { + throw sample_exception("deimplemented"); + +// string fs, vs; +// +// fs.reserve(1024); +// vs.reserve(1024); +// +// if (filter_fields) { +// auto i = fields.begin(); +// +// while (i != fields.end()) { +// auto field = *i; +// +// fs += field; +// +// auto value = values.find(field); +// +// if (value != values.end()) { +// vs += "'" + value->second + "'"; +// } else { +// vs += "NULL"; +// } +// +// i++; +// +// if (i != fields.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } else { +// auto i = values.begin(); +// while (i != values.end()) { +// auto v = *i++; +// +// fs += v.first; +// vs += "'" + v.second + "'"; +// +// if (i != values.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } +// +// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; +} + +void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { + + size_t size = buffer_size(buffer); + + if (size == 0 && line->size()) { + process_line(line); + line = make_shared<vector<uint8_t>>(); + return; + } + + auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); + + for (int i = 0; i < size; i++) { + uint8_t b = data[i]; + + if (b == packet_delimiter) { + process_line(line); + line = make_shared<vector<uint8_t>>(); + } else { + line->push_back(b); + } + } + +} + +void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) { + auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + auto s = std::string((char *) packet->data(), packet->size()); + + static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); + + auto start = s.cbegin(); + auto end = s.cend(); + boost::match_results<std::string::const_iterator> what; + boost::match_flag_type flags = boost::match_default; + + SampleRecord sample(dict); + + while (regex_search(start, end, what, e, flags)) { + auto name = static_cast<string>(what[1]); + auto value = static_cast<string>(what[2]); + start = what[0].second; + + auto key = dict.indexOf(name); + sample.set(key, value); + + flags |= boost::match_prev_avail; + flags |= boost::match_not_bob; + } + + output->write(sample); +} + +AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { + // Directly select the parser now until we have more than one parser + parser = std::move(keyValueParser); + type_ = sample_format_type::KEY_VALUE; +} + +void AutoSampleParser::process(mutable_buffers_1 buffer) { + if (parser) { + parser->process(buffer); + } else { + throw runtime_error("Not implemented yet"); + } +} + +string to_string(const sample_format_type &arg) { + if (arg == sample_format_type::AUTO) + return "auto"; + else if (arg == sample_format_type::CSV) + return "csv"; + else if (arg == sample_format_type::JSON) + return "json"; + else if (arg == sample_format_type::KEY_VALUE) + return "key-value"; + else if (arg == sample_format_type::SQL) + return "sql"; + else if (arg == sample_format_type::RRD) + return "rrd"; + else + return "unknown"; +} + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { + return os << to_string(type); +} + +std::istream& operator>>(std::istream& is, sample_format_type& type) { + string s; + is >> s; + + if (s == "auto") { + type = sample_format_type::AUTO; + } else if (s == "csv") { + type = sample_format_type::CSV; + } else if (s == "key-value") { + type = sample_format_type::KEY_VALUE; + } else if (s == "json") { + type = sample_format_type::JSON; + } else if (s == "sql") { + type = sample_format_type::SQL; + } else if (s == "rrd") { + type = sample_format_type::RRD; + } + + return is; +} + +unique_ptr<SampleStreamParser> open_sample_input_stream( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type) { + if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleStreamParser>(output, dict); + } else if (type == sample_format_type::AUTO) { + return make_unique<AutoSampleParser>(output, dict); + } else { + throw sample_exception("No parser for format type: " + to_string(type)); + } +} + +template<typename T> +o<T *> find_option(vector<sample_output_stream_option *> &options) { + for (sample_output_stream_option *& option : options) { + T *x = dynamic_cast<T *>(option); + + if (x != nullptr) { + return o<T *>(x); + } + } + + return o<T *>(); +} + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, + vector<sample_output_stream_option *> options) { + + if (type == sample_format_type::CSV) { + return make_unique<CsvSampleOutputStream>(output, dict); + } else if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleOutputStream>(output, dict); + } else if (type == sample_format_type::JSON) { + return make_unique<JsonSampleOutputStream>(output, dict); + } else if (type == sample_format_type::RRD) { + o<output_fields *> of = find_option<output_fields>(options); + + o<timestamp_field *> tsf = find_option<timestamp_field>(options); + + auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); + + return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); +// } else if (type == sample_format_type::SQL) { +// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name); + } else { + throw sample_exception("No writer for format type: " + to_string(type)); + } +} + +//template<typename T> +ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) { +} + +//template<typename T> +void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { + std::unique_lock<std::mutex> lock(mutex); + + underlying->write(sample); +} + +} +} |