#include "trygvis/sensor/io.h" #include <map> #include <json.hpp> #include <boost/tokenizer.hpp> #include <boost/algorithm/string.hpp> namespace trygvis { namespace sensor { namespace io { using namespace std; using namespace std::chrono; using boost::tokenizer; using boost::escaped_list_separator; using json = nlohmann::json; unique_ptr<SampleStreamParser> open_sample_stream_parser( shared_ptr<SampleConsumer> output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { return make_unique<KeyValueSampleStreamParser>(output, dict); } else if (type == sample_format_type::AUTO) { return make_unique<AutoSampleParser>(output, dict); } else { throw sample_exception("No parser for format type: " + to_string(type)); } } unique_ptr<SampleConsumer> open_sample_writer( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options) { if (type == sample_format_type::CSV) { return make_unique<CsvWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::KEY_VALUE) { return make_unique<KeyValueWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::JSON) { return make_unique<JsonWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::RRD) { auto of = options.find_option<output_fields_option>(); auto tsf = options.find_option<timestamp_field_option>(); auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp"); return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of); } else if (type == sample_format_type::SQL) { auto tno = options.find_option<table_name_option>(); if (!tno) { throw missing_required_option_error("table name"); } return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying) : underlying(move(underlying)) { } void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) { std::unique_lock<std::mutex> lock(mutex); underlying->onSample(sample); } AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { } void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) { if (sample.at(timestamp_key)) { underlying_->onSample(sample); return; } auto time_since_epoch = system_clock::now().time_since_epoch(); auto timestamp = duration_cast<seconds>(time_since_epoch).count(); auto timestamp_s = std::to_string(timestamp); SampleRecord copy = sample; copy.set(timestamp_key, timestamp_s); underlying_->onSample(copy); } void VectorSampleOutputStream::onSample(SampleRecord const &sample) { if (sample.empty()) { return; } samples.emplace_back(sample); } CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } // Build the dict with the keys from the first sample. if (dict.empty()) { SampleKeyIndex index = 0; auto ptr = sample.cbegin(); while (ptr != sample.cend()) { auto o = *ptr; if (o) { auto name = sample.dict.at(index)->name; dict.indexOf(name); } ptr++; index++; } } if (!headerWritten) { writeHeader(); headerWritten = true; } auto &s = *stream.get(); auto it = dict.begin(); while (it != dict.end()) { if (it != dict.begin()) { s << ","; } auto key = *it++; auto sampleKey = sample.dict.indexOf(key->name); auto o = sample.at(sampleKey); if (o) { s << o.value(); } } s << endl << flush; } void CsvWriterSampleConsumer::writeHeader() { auto &s = *stream; auto i = dict.begin(); while (i != dict.end()) { s << (*i)->name; i++; if (i != dict.end()) { s << ","; } } s << endl << flush; } JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } json doc({}); if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { doc[key->name] = value.value(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); doc[sampleKey->name] = o.value(); } } } *stream.get() << doc << endl << flush; } KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } auto s = stream.get(); bool first = true; if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { if (first) { first = false; } else { *s << ", "; } *s << key->name << "=" << value.value(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { if (first) { first = false; } else { *s << ", "; } // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); *s << sampleKey->name << "=" << o.value(); } } } *s << endl << flush; } RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields) : stream(move(stream)), timestamp_key(timestamp_key) { if (output_fields) { for (auto field : output_fields.value()->fields) { keys.emplace_back(dict.indexOf(field)); } } else { for (auto key : dict) { keys.emplace_back(key); } } } void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } auto &s = *stream.get(); auto timestampO = sample.at(timestamp_key); if (!timestampO) { return; } auto timestamp = timestampO.value(); s << timestamp; bool first = true; for (auto &key: keys) { if (key == timestamp_key) { continue; } auto value = sample.at(key); if (first) { s << "@"; first = false; } else { s << ":"; } s << (value ? value.value() : "U"); } s << endl << flush; } SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) { string fs, vs; fs.reserve(1024); vs.reserve(1024); auto &s = *stream; bool first = true; if (!dict.empty()) { for (auto &key: dict) { auto sample_key = sample.dict.indexOf(key->name); auto value = sample.at(sample_key); if (value) { if (first) { first = false; } else { fs += ", "; vs += ", "; } fs += "\"" + key->name + "\""; vs += "'" + value.value() + "'"; } } } else { for (auto &sample_key: sample.dict) { auto o = sample.at(sample_key); if (o) { if (first) { first = false; } else { fs += ", "; vs += ", "; } // Make sure that the key is registered in the dictionary dict.indexOf(sample_key->name); fs += "\"" + sample_key->name + "\""; vs += "'" + o.value() + "'"; } } } s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } int KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { size_t size = buffer_size(buffer); if (size == 0) { return 0; } auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); int count = 0; for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == '\0') { continue; } else if (b == packet_delimiter) { process_line(line); count++; line = make_shared<vector<uint8_t>>(); } else { line->push_back(b); } } return count; } int KeyValueSampleStreamParser::finish() { if (line->size()) { process_line(line); return 1; } return 0; } void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packet) { auto s = std::string((char *) packet->data(), packet->size()); // boost::algorithm::erase_all(s, "\0"); typedef tokenizer<escaped_list_separator<char>> Tokenizer; Tokenizer tokens(s); SampleRecord sample(dict_); for (auto token : tokens) { auto index = token.find('='); if (index == string::npos) { continue; } auto name = token.substr(0, index); boost::algorithm::trim(name); auto value = token.substr(index + 1); boost::algorithm::trim(value); auto key = dict_.indexOf(name); sample.set(key, value); } output->onSample(sample); } AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); type_ = sample_format_type::KEY_VALUE; } int AutoSampleParser::process(mutable_buffers_1 &buffer) { if (parser) { return parser->process(buffer); } else { throw runtime_error("Not implemented yet"); } } int AutoSampleParser::finish() { if (parser) { return parser->finish(); } else { throw runtime_error("Not implemented yet"); } } } } }