diff options
Diffstat (limited to 'sensor/main/SensorSample.cpp')
-rw-r--r-- | sensor/main/SensorSample.cpp | 454 |
1 files changed, 0 insertions, 454 deletions
diff --git a/sensor/main/SensorSample.cpp b/sensor/main/SensorSample.cpp deleted file mode 100644 index 5f0e9c6..0000000 --- a/sensor/main/SensorSample.cpp +++ /dev/null @@ -1,454 +0,0 @@ -#include "trygvis/SensorSample.h" - -#include "json.hpp" -#include <set> -#include <boost/regex.hpp> -#include <chrono> - -namespace trygvis { -namespace sensor { - -using namespace std; -using json = nlohmann::json; - -void VectorSampleOutputStream::write(SampleRecord const &sample) { - if (sample.empty()) { - return; - } - - samples.emplace_back(sample); -} - -CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) - : stream(move(stream)), headerWritten(false), dict(dict) { -} - -void CsvSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - // Build the dict with the keys from the first sample. - if (dict.empty()) { - SampleKeyIndex index = 0; - auto ptr = sample.cbegin(); - while (ptr != sample.cend()) { - auto o = *ptr; - - if (o) { - auto name = sample.dict.at(index)->name; - dict.indexOf(name); - } - - ptr++; - index++; - } - } - - if (!headerWritten) { - writeHeader(); - headerWritten = true; - } - - auto &s = *stream.get(); - - auto it = dict.begin(); - while (it != dict.end()) { - if (it != dict.begin()) { - s << ","; - } - - auto key = *it++; - auto sampleKey = sample.dict.indexOf(key->name); - auto o = sample.at(sampleKey); - - if (o) { - s << o.get(); - } - } - - s << endl; -} - -void CsvSampleOutputStream::writeHeader() { - auto &s = *stream.get(); - - auto i = dict.begin(); - while (i != dict.end()) { - s << (*i)->name; - - i++; - - if (i != dict.end()) { - s << ","; - } - } - - s << endl; -} - -JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : - dict(dict), stream(move(stream)) { -} - -void JsonSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - json doc({}); - - if (!dict.empty()) { - for (auto &key: dict) { - auto sampleKey = sample.dict.indexOf(key->name); - - auto value = sample.at(sampleKey); - - if (value) { - doc[key->name] = value.get(); - } - } - } else { - for (auto &sampleKey: sample.dict) { - auto o = sample.at(sampleKey); - - if (o) { - // Make sure that the key is registered in the dictionary - dict.indexOf(sampleKey->name); - doc[sampleKey->name] = o.get(); - } - } - } - - *stream.get() << doc << endl; -} - -KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : - dict(dict), stream(move(stream)) { -} - -void KeyValueSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - auto &s = *stream.get(); - - bool first = true; - if (!dict.empty()) { - for (auto &key: dict) { - auto sampleKey = sample.dict.indexOf(key->name); - - auto value = sample.at(sampleKey); - - if (value) { - if (first) { - first = false; - } else { - s << ", "; - } - s << key->name << "=" << value.get(); - } - } - } else { - for (auto &sampleKey: sample.dict) { - auto o = sample.at(sampleKey); - - if (o) { - if (first) { - first = false; - } else { - s << ", "; - } - // Make sure that the key is registered in the dictionary - dict.indexOf(sampleKey->name); - s << sampleKey->name << "=" << o.get(); - } - } - } - - *stream.get() << endl; -} - -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) : - stream(move(stream)), timestamp_key(timestamp_key) { - if (output_fields) { - for (auto field : output_fields.get()->fields) { - keys.emplace_back(dict.indexOf(field)); - } - } else { - for (auto key : dict) { - keys.emplace_back(key); - } - } -} - -void RrdSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - auto &s = *stream.get(); - - auto timestampO = sample.at(timestamp_key); - - if (!timestampO) { - return; - } - - auto timestamp = timestampO.get(); - - s << timestamp; - - bool first = true; - for (auto &key: keys) { - if (key == timestamp_key) { - continue; - } - - auto value = sample.at(key); - - if (first) { - s << "@"; - first = false; - } else { - s << ":"; - } - - s << (value ? value.get() : "U"); - } - - *stream.get() << endl; -} - -SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : - dict(dict), stream(move(stream)), table_name(table_name) { -} - -void SqlSampleOutputStream::write(SampleRecord const &values) { - throw sample_exception("deimplemented"); - -// string fs, vs; -// -// fs.reserve(1024); -// vs.reserve(1024); -// -// if (filter_fields) { -// auto i = fields.begin(); -// -// while (i != fields.end()) { -// auto field = *i; -// -// fs += field; -// -// auto value = values.find(field); -// -// if (value != values.end()) { -// vs += "'" + value->second + "'"; -// } else { -// vs += "NULL"; -// } -// -// i++; -// -// if (i != fields.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } else { -// auto i = values.begin(); -// while (i != values.end()) { -// auto v = *i++; -// -// fs += v.first; -// vs += "'" + v.second + "'"; -// -// if (i != values.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } -// -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; -} - -void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { - - size_t size = buffer_size(buffer); - - if (size == 0 && line->size()) { - process_line(line); - line = make_shared<vector<uint8_t>>(); - return; - } - - auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); - - for (int i = 0; i < size; i++) { - uint8_t b = data[i]; - - if (b == packet_delimiter) { - process_line(line); - line = make_shared<vector<uint8_t>>(); - } else { - line->push_back(b); - } - } - -} - -void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) { - auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - auto s = std::string((char *) packet->data(), packet->size()); - - static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); - - auto start = s.cbegin(); - auto end = s.cend(); - boost::match_results<std::string::const_iterator> what; - boost::match_flag_type flags = boost::match_default; - - SampleRecord sample(dict); - - while (regex_search(start, end, what, e, flags)) { - auto name = static_cast<string>(what[1]); - auto value = static_cast<string>(what[2]); - start = what[0].second; - - auto key = dict.indexOf(name); - sample.set(key, value); - - flags |= boost::match_prev_avail; - flags |= boost::match_not_bob; - } - - output->write(sample); -} - -AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { - // Directly select the parser now until we have more than one parser - parser = std::move(keyValueParser); - type_ = sample_format_type::KEY_VALUE; -} - -void AutoSampleParser::process(mutable_buffers_1 buffer) { - if (parser) { - parser->process(buffer); - } else { - throw runtime_error("Not implemented yet"); - } -} - -string to_string(const sample_format_type &arg) { - if (arg == sample_format_type::AUTO) - return "auto"; - else if (arg == sample_format_type::CSV) - return "csv"; - else if (arg == sample_format_type::JSON) - return "json"; - else if (arg == sample_format_type::KEY_VALUE) - return "key-value"; - else if (arg == sample_format_type::SQL) - return "sql"; - else if (arg == sample_format_type::RRD) - return "rrd"; - else - return "unknown"; -} - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { - return os << to_string(type); -} - -std::istream& operator>>(std::istream& is, sample_format_type& type) { - string s; - is >> s; - - if (s == "auto") { - type = sample_format_type::AUTO; - } else if (s == "csv") { - type = sample_format_type::CSV; - } else if (s == "key-value") { - type = sample_format_type::KEY_VALUE; - } else if (s == "json") { - type = sample_format_type::JSON; - } else if (s == "sql") { - type = sample_format_type::SQL; - } else if (s == "rrd") { - type = sample_format_type::RRD; - } - - return is; -} - -unique_ptr<SampleStreamParser> open_sample_input_stream( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type) { - if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleStreamParser>(output, dict); - } else if (type == sample_format_type::AUTO) { - return make_unique<AutoSampleParser>(output, dict); - } else { - throw sample_exception("No parser for format type: " + to_string(type)); - } -} - -template<typename T> -o<T *> find_option(vector<sample_output_stream_option *> &options) { - for (sample_output_stream_option *& option : options) { - T *x = dynamic_cast<T *>(option); - - if (x != nullptr) { - return o<T *>(x); - } - } - - return o<T *>(); -} - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - vector<sample_output_stream_option *> options) { - - if (type == sample_format_type::CSV) { - return make_unique<CsvSampleOutputStream>(output, dict); - } else if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleOutputStream>(output, dict); - } else if (type == sample_format_type::JSON) { - return make_unique<JsonSampleOutputStream>(output, dict); - } else if (type == sample_format_type::RRD) { - o<output_fields *> of = find_option<output_fields>(options); - - o<timestamp_field *> tsf = find_option<timestamp_field>(options); - - auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); - - return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); -// } else if (type == sample_format_type::SQL) { -// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name); - } else { - throw sample_exception("No writer for format type: " + to_string(type)); - } -} - -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) { -} - -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { - std::unique_lock<std::mutex> lock(mutex); - - underlying->write(sample); -} - -} -} |