aboutsummaryrefslogtreecommitdiff
path: root/sensor/main/io.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main/io.cpp')
-rw-r--r--sensor/main/io.cpp418
1 files changed, 418 insertions, 0 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
new file mode 100644
index 0000000..18040f1
--- /dev/null
+++ b/sensor/main/io.cpp
@@ -0,0 +1,418 @@
+#include "trygvis/sensor/io.h"
+
+#include <ostream>
+#include <vector>
+#include <map>
+#include <mutex>
+#include "json.hpp"
+#include "boost/regex.hpp"
+
+namespace trygvis {
+namespace sensor {
+namespace io {
+
+using namespace std;
+using json = nlohmann::json;
+
+ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
+ : underlying(move(underlying)) {
+}
+
+void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ underlying->write(sample);
+}
+
+void VectorSampleOutputStream::write(SampleRecord const &sample) {
+ if (sample.empty()) {
+ return;
+ }
+
+ samples.emplace_back(sample);
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
+ : stream(move(stream)), headerWritten(false), dict(dict) {
+}
+
+void CsvSampleOutputStream::write(SampleRecord const &sample) {
+// Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+// Build the dict with the keys from the first sample.
+ if (dict.empty()) {
+ SampleKeyIndex index = 0;
+ auto ptr = sample.cbegin();
+ while (ptr != sample.cend()) {
+ auto o = *ptr;
+
+ if (o) {
+ auto name = sample.dict.at(index)->name;
+ dict.indexOf(name);
+ }
+
+ ptr++;
+ index++;
+ }
+ }
+
+ if (!headerWritten) {
+ writeHeader();
+ headerWritten = true;
+ }
+
+ auto &s = *stream.get();
+
+ auto it = dict.begin();
+ while (it != dict.end()) {
+ if (it != dict.begin()) {
+ s << ",";
+ }
+
+ auto key = *it++;
+ auto sampleKey = sample.dict.indexOf(key->name);
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ s << o.get();
+ }
+ }
+
+ s << endl;
+}
+
+void CsvSampleOutputStream::writeHeader() {
+ auto &s = *stream.get();
+
+ auto i = dict.begin();
+ while (i != dict.end()) {
+ s << (*i)->name;
+
+ i++;
+
+ if (i != dict.end()) {
+ s << ",";
+ }
+ }
+
+ s << endl;
+}
+
+JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+ dict(dict), stream(move(stream)) {
+}
+
+void JsonSampleOutputStream::write(SampleRecord const &sample) {
+// Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ json doc({});
+
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sampleKey = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sampleKey);
+
+ if (value) {
+ doc[key->name] = value.get();
+ }
+ }
+ } else {
+ for (auto &sampleKey: sample.dict) {
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+// Make sure that the key is registered in the dictionary
+ dict.indexOf(sampleKey->name);
+ doc[sampleKey->name] = o.get();
+ }
+ }
+ }
+
+ *stream.get() << doc << endl;
+}
+
+KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+ dict(dict), stream(move(stream)) {
+}
+
+void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
+// Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ bool first = true;
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sampleKey = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sampleKey);
+
+ if (value) {
+ if (first) {
+ first = false;
+ } else {
+ s << ", ";
+ }
+ s << key->name << "=" << value.get();
+ }
+ }
+ } else {
+ for (auto &sampleKey: sample.dict) {
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ if (first) {
+ first = false;
+ } else {
+ s << ", ";
+ }
+// Make sure that the key is registered in the dictionary
+ dict.indexOf(sampleKey->name);
+ s << sampleKey->name << "=" << o.get();
+ }
+ }
+ }
+
+ *stream.get() << endl;
+}
+
+RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields)
+ :
+ stream(move(stream)), timestamp_key(timestamp_key) {
+ if (output_fields) {
+ for (auto field : output_fields.get()->fields) {
+ keys.emplace_back(dict.indexOf(field));
+ }
+ } else {
+ for (auto key : dict) {
+ keys.emplace_back(key);
+ }
+ }
+}
+
+void RrdSampleOutputStream::write(SampleRecord const &sample) {
+// Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ auto timestampO = sample.at(timestamp_key);
+
+ if (!timestampO) {
+ return;
+ }
+
+ auto timestamp = timestampO.get();
+
+ s << timestamp;
+
+ bool first = true;
+ for (auto &key: keys) {
+ if (key == timestamp_key) {
+ continue;
+ }
+
+ auto value = sample.at(key);
+
+ if (first) {
+ s << "@";
+ first = false;
+ } else {
+ s << ":";
+ }
+
+ s << (value ? value.get() : "U");
+ }
+
+ *stream.get() << endl;
+}
+
+SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+ dict(dict), stream(move(stream)), table_name(table_name) {
+}
+
+void SqlSampleOutputStream::write(SampleRecord const &values) {
+ throw sample_exception("deimplemented");
+
+// string fs, vs;
+//
+// fs.reserve(1024);
+// vs.reserve(1024);
+//
+// if (filter_fields) {
+// auto i = fields.begin();
+//
+// while (i != fields.end()) {
+// auto field = *i;
+//
+// fs += field;
+//
+// auto value = values.find(field);
+//
+// if (value != values.end()) {
+// vs += "'" + value->second + "'";
+// } else {
+// vs += "NULL";
+// }
+//
+// i++;
+//
+// if (i != fields.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// } else {
+// auto i = values.begin();
+// while (i != values.end()) {
+// auto v = *i++;
+//
+// fs += v.first;
+// vs += "'" + v.second + "'";
+//
+// if (i != values.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// }
+//
+// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+}
+
+void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
+
+ size_t size = buffer_size(buffer);
+
+ if (size == 0 && line->size()) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ return;
+ }
+
+ auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
+
+ for (int i = 0; i < size; i++) {
+ uint8_t b = data[i];
+
+ if (b == packet_delimiter) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ } else {
+ line->push_back(b);
+ }
+ }
+
+}
+
+void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+ auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
+ auto s = std::string((char *) packet->data(), packet->size());
+
+ static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)");
+
+ auto start = s.cbegin();
+ auto end = s.cend();
+ boost::match_results <std::string::const_iterator> what;
+ boost::match_flag_type flags = boost::match_default;
+
+ SampleRecord sample(dict);
+
+ while (regex_search(start, end, what, e, flags)) {
+ auto name = static_cast<string>(what[1]);
+ auto value = static_cast<string>(what[2]);
+ start = what[0].second;
+
+ auto key = dict.indexOf(name);
+ sample.set(key, value);
+
+ flags |= boost::match_prev_avail;
+ flags |= boost::match_not_bob;
+ }
+
+ output->write(sample);
+}
+
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
+ // Directly select the parser now until we have more than one parser
+ parser = std::move(keyValueParser);
+ type_ = sample_format_type::KEY_VALUE;
+}
+
+void AutoSampleParser::process(mutable_buffers_1 buffer) {
+ if (parser) {
+ parser->process(buffer);
+ } else {
+ throw runtime_error("Not implemented yet");
+ }
+}
+
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleStreamParser>(output, dict);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output, dict);
+ } else {
+ throw sample_exception("No parser for format type: " + to_string(type));
+ }
+}
+
+template<typename T>
+o<T *> find_option(vector<sample_output_stream_option *> &options) {
+ for (sample_output_stream_option *&option : options) {
+ T *x = dynamic_cast<T *>(option);
+
+ if (x != nullptr) {
+ return o<T *>(x);
+ }
+ }
+
+ return o<T *>();
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ vector<sample_output_stream_option *> options) {
+
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::JSON) {
+ return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ o<output_fields *> of = find_option<output_fields>(options);
+
+ o<timestamp_field *> tsf = find_option<timestamp_field>(options);
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+// } else if (type == sample_format_type::SQL) {
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
+ } else {
+ throw sample_exception("No writer for format type: " + to_string(type));
+ }
+}
+
+}
+}
+} \ No newline at end of file