aboutsummaryrefslogtreecommitdiff
path: root/sensor/main
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main')
-rw-r--r--sensor/main/io.cpp56
1 files changed, 28 insertions, 28 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 9e822c3..a0dcdc9 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -16,7 +16,7 @@ using boost::escaped_list_separator;
using json = nlohmann::json;
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type) {
if (type == sample_format_type::KEY_VALUE) {
@@ -28,18 +28,18 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser(
}
}
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
+ return make_unique<CsvWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
+ return make_unique<KeyValueWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
+ return make_unique<JsonWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::RRD) {
auto of = options.find_option<output_fields_option>();
@@ -47,7 +47,7 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp");
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+ return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of);
} else if (type == sample_format_type::SQL) {
auto tno = options.find_option<table_name_option>();
@@ -55,30 +55,30 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
throw missing_required_option_error("table name");
}
- return make_unique<SqlSampleOutputStream>(move(output), dict, tno.value()->name);
+ return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name);
} else {
throw sample_exception("No writer for format type: " + to_string(type));
}
}
-ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
+ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying)
: underlying(move(underlying)) {
}
-void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) {
std::unique_lock<std::mutex> lock(mutex);
- underlying->write(sample);
+ underlying->onSample(sample);
}
-AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
+AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying,
KeyDictionary &dict,
const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
}
-void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) {
if (sample.at(timestamp_key)) {
- underlying_->write(sample);
+ underlying_->onSample(sample);
return;
}
@@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
SampleRecord copy = sample;
copy.set(timestamp_key, timestamp_s);
- underlying_->write(copy);
+ underlying_->onSample(copy);
}
-void VectorSampleOutputStream::write(SampleRecord const &sample) {
+void VectorSampleOutputStream::onSample(SampleRecord const &sample) {
if (sample.empty()) {
return;
}
@@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) {
samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
+CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict)
: stream(move(stream)), headerWritten(false), dict(dict) {
}
-void CsvSampleOutputStream::write(SampleRecord const &sample) {
+void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-void CsvSampleOutputStream::writeHeader() {
+void CsvWriterSampleConsumer::writeHeader() {
auto &s = *stream;
auto i = dict.begin();
@@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() {
s << endl << flush;
}
-JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void JsonSampleOutputStream::write(SampleRecord const &sample) {
+void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << doc << endl << flush;
}
-KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
+void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*s << endl << flush;
}
-RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
+RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream,
KeyDictionary &dict,
const SampleKey *timestamp_key,
o<output_fields_option *> output_fields) :
@@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
}
}
-void RrdSampleOutputStream::write(SampleRecord const &sample) {
+void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
dict(dict), stream(move(stream)), table_name(table_name) {
}
-void SqlSampleOutputStream::write(SampleRecord const &sample) {
+void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) {
string fs, vs;
fs.reserve(1024);
@@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packe
sample.set(key, value);
}
- output->write(sample);
+ output->onSample(sample);
}
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);