#include "trygvis/sensor/io.h" #include #include #include #include #include "json.hpp" #include "boost/regex.hpp" namespace trygvis { namespace sensor { namespace io { using namespace std; using json = nlohmann::json; ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) : underlying(move(underlying)) { } void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { std::unique_lock lock(mutex); underlying->write(sample); } void VectorSampleOutputStream::write(SampleRecord const &sample) { if (sample.empty()) { return; } samples.emplace_back(sample); } CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } void CsvSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } // Build the dict with the keys from the first sample. if (dict.empty()) { SampleKeyIndex index = 0; auto ptr = sample.cbegin(); while (ptr != sample.cend()) { auto o = *ptr; if (o) { auto name = sample.dict.at(index)->name; dict.indexOf(name); } ptr++; index++; } } if (!headerWritten) { writeHeader(); headerWritten = true; } auto &s = *stream.get(); auto it = dict.begin(); while (it != dict.end()) { if (it != dict.begin()) { s << ","; } auto key = *it++; auto sampleKey = sample.dict.indexOf(key->name); auto o = sample.at(sampleKey); if (o) { s << o.get(); } } s << endl; } void CsvSampleOutputStream::writeHeader() { auto &s = *stream.get(); auto i = dict.begin(); while (i != dict.end()) { s << (*i)->name; i++; if (i != dict.end()) { s << ","; } } s << endl; } JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void JsonSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } json doc({}); if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { doc[key->name] = value.get(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); doc[sampleKey->name] = o.get(); } } } *stream.get() << doc << endl; } KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void KeyValueSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } auto &s = *stream.get(); bool first = true; if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { if (first) { first = false; } else { s << ", "; } s << key->name << "=" << value.get(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { if (first) { first = false; } else { s << ", "; } // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); s << sampleKey->name << "=" << o.get(); } } } *stream.get() << endl; } RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields) : stream(move(stream)), timestamp_key(timestamp_key) { if (output_fields) { for (auto field : output_fields.get()->fields) { keys.emplace_back(dict.indexOf(field)); } } else { for (auto key : dict) { keys.emplace_back(key); } } } void RrdSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } auto &s = *stream.get(); auto timestampO = sample.at(timestamp_key); if (!timestampO) { return; } auto timestamp = timestampO.get(); s << timestamp; bool first = true; for (auto &key: keys) { if (key == timestamp_key) { continue; } auto value = sample.at(key); if (first) { s << "@"; first = false; } else { s << ":"; } s << (value ? value.get() : "U"); } *stream.get() << endl; } SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } void SqlSampleOutputStream::write(SampleRecord const &values) { throw sample_exception("deimplemented"); // string fs, vs; // // fs.reserve(1024); // vs.reserve(1024); // // if (filter_fields) { // auto i = fields.begin(); // // while (i != fields.end()) { // auto field = *i; // // fs += field; // // auto value = values.find(field); // // if (value != values.end()) { // vs += "'" + value->second + "'"; // } else { // vs += "NULL"; // } // // i++; // // if (i != fields.end()) { // fs += ","; // vs += ","; // } // } // } else { // auto i = values.begin(); // while (i != values.end()) { // auto v = *i++; // // fs += v.first; // vs += "'" + v.second + "'"; // // if (i != values.end()) { // fs += ","; // vs += ","; // } // } // } // // (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { size_t size = buffer_size(buffer); if (size == 0 && line->size()) { process_line(line); line = make_shared>(); return; } auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); auto start = s.cbegin(); auto end = s.cend(); boost::match_results what; boost::match_flag_type flags = boost::match_default; SampleRecord sample(dict); while (regex_search(start, end, what, e, flags)) { auto name = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; auto key = dict.indexOf(name); sample.set(key, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } output->write(sample); } AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); type_ = sample_format_type::KEY_VALUE; } void AutoSampleParser::process(mutable_buffers_1 buffer) { if (parser) { parser->process(buffer); } else { throw runtime_error("Not implemented yet"); } } unique_ptr open_sample_input_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { return make_unique(output, dict); } else if (type == sample_format_type::AUTO) { return make_unique(output, dict); } else { throw sample_exception("No parser for format type: " + to_string(type)); } } template o find_option(vector &options) { for (sample_output_stream_option *&option : options) { T *x = dynamic_cast(option); if (x != nullptr) { return o(x); } } return o(); } unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type, vector options) { if (type == sample_format_type::CSV) { return make_unique(output, dict); } else if (type == sample_format_type::KEY_VALUE) { return make_unique(output, dict); } else if (type == sample_format_type::JSON) { return make_unique(output, dict); } else if (type == sample_format_type::RRD) { o of = find_option(options); o tsf = find_option(options); auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); return make_unique(output, dict, timestamp_key, of); // } else if (type == sample_format_type::SQL) { // return make_unique(dict, move(output), table_name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } } } }