diff options
Diffstat (limited to 'sensor/main')
-rw-r--r-- | sensor/main/io.cpp | 56 |
1 files changed, 28 insertions, 28 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 9e822c3..a0dcdc9 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -16,7 +16,7 @@ using boost::escaped_list_separator; using json = nlohmann::json; unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, + shared_ptr<SampleConsumer> output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { @@ -28,18 +28,18 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser( } } -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_writer( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options) { if (type == sample_format_type::CSV) { - return make_unique<CsvSampleOutputStream>(output, dict); + return make_unique<CsvWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleOutputStream>(output, dict); + return make_unique<KeyValueWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::JSON) { - return make_unique<JsonSampleOutputStream>(output, dict); + return make_unique<JsonWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::RRD) { auto of = options.find_option<output_fields_option>(); @@ -47,7 +47,7 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp"); - return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); + return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of); } else if (type == sample_format_type::SQL) { auto tno = options.find_option<table_name_option>(); @@ -55,30 +55,30 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( throw missing_required_option_error("table name"); } - return make_unique<SqlSampleOutputStream>(move(output), dict, tno.value()->name); + return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) +ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying) : underlying(move(underlying)) { } -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { +void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) { std::unique_lock<std::mutex> lock(mutex); - underlying->write(sample); + underlying->onSample(sample); } -AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, +AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { } -void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { +void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) { if (sample.at(timestamp_key)) { - underlying_->write(sample); + underlying_->onSample(sample); return; } @@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { SampleRecord copy = sample; copy.set(timestamp_key, timestamp_s); - underlying_->write(copy); + underlying_->onSample(copy); } -void VectorSampleOutputStream::write(SampleRecord const &sample) { +void VectorSampleOutputStream::onSample(SampleRecord const &sample) { if (sample.empty()) { return; } @@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) { samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) +CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } -void CsvSampleOutputStream::write(SampleRecord const &sample) { +void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -void CsvSampleOutputStream::writeHeader() { +void CsvWriterSampleConsumer::writeHeader() { auto &s = *stream; auto i = dict.begin(); @@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() { s << endl << flush; } -JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : +JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void JsonSampleOutputStream::write(SampleRecord const &sample) { +void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) { *stream.get() << doc << endl << flush; } -KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : +KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void KeyValueSampleOutputStream::write(SampleRecord const &sample) { +void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { *s << endl << flush; } -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, +RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields) : @@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, } } -void RrdSampleOutputStream::write(SampleRecord const &sample) { +void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : +SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &sample) { +void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) { string fs, vs; fs.reserve(1024); @@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packe sample.set(key, value); } - output->write(sample); + output->onSample(sample); } -AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : +AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); |