From 1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Fri, 1 Sep 2017 13:59:25 +0200 Subject: o Renaming SampleOutputStream to SampleConsumer. o Some c++ style fixes. --- sensor/include/trygvis/sensor/io.h | 125 +++++++++++++++++-------------------- sensor/main/io.cpp | 56 ++++++++--------- 2 files changed, 85 insertions(+), 96 deletions(-) (limited to 'sensor') diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index 304091b..403af9b 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -4,6 +4,7 @@ #include #include +#include namespace trygvis { namespace sensor { @@ -13,47 +14,40 @@ using namespace std; using namespace boost::asio; struct sample_output_stream_option { - virtual ~sample_output_stream_option() { - }; + virtual ~sample_output_stream_option() = default;; }; struct output_fields_option : sample_output_stream_option { - ~output_fields_option() { - } + ~output_fields_option() override = default; vector fields; }; struct timestamp_field_option : sample_output_stream_option { - timestamp_field_option(const string name) : name(name) { - } + explicit timestamp_field_option(string name) : name(std::move(name)) {} - ~timestamp_field_option() { - } + ~timestamp_field_option() override = default; const string name; }; class table_name_option : public sample_output_stream_option { public: - table_name_option(const string name) : name(name) { - } + explicit table_name_option(string name) : name(std::move(name)) {} - ~table_name_option() { - } + ~table_name_option() override = default; const string name; }; class sample_output_stream_options : public vector { public: - ~sample_output_stream_options() { - } + ~sample_output_stream_options() = default; template o find_option() const { - for (auto it = begin(); it != end(); ++it) { - T *x = dynamic_cast(*it); + for (auto it : *this) { + auto *x = dynamic_cast(it); if (x != nullptr) { return o(x); @@ -65,90 +59,85 @@ public: }; struct missing_required_option_error : runtime_error { - missing_required_option_error(string what) : runtime_error(what) { - } + explicit missing_required_option_error(const string &what) : runtime_error(what) {} - ~missing_required_option_error() { - } + ~missing_required_option_error() override = default; }; class SampleStreamParser; -class SampleOutputStream; +class SampleConsumer; /** * Throws missing_required_option_error */ unique_ptr open_sample_stream_parser( - shared_ptr output, + shared_ptr output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); /** * Throws missing_required_option_error */ -unique_ptr open_sample_output_stream( +unique_ptr open_sample_writer( shared_ptr output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options); static inline -unique_ptr open_sample_output_stream( +unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { sample_output_stream_options options; - return open_sample_output_stream(output, dict, type, options); + return open_sample_writer(std::move(output), dict, type, options); } -class SampleOutputStream { +class SampleConsumer { public: - virtual void write(SampleRecord const &sample) = 0; + virtual void onSample(SampleRecord const &sample) = 0; }; -class VectorSampleOutputStream : public SampleOutputStream { +class VectorSampleOutputStream : public SampleConsumer { public: - virtual void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; -public: vector samples; }; -class ThreadSafeSampleOutputStream : public SampleOutputStream { +class ThreadSafeSampleConsumer : public SampleConsumer { public: - ThreadSafeSampleOutputStream(unique_ptr underlying); + explicit ThreadSafeSampleConsumer(unique_ptr underlying); - ~ThreadSafeSampleOutputStream() { - } + ~ThreadSafeSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr underlying; + unique_ptr underlying; std::mutex mutex; }; -class AddTimestampSampleOutputStream : public SampleOutputStream { +class AddTimestampSampleConsumer : public SampleConsumer { public: - AddTimestampSampleOutputStream(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name); + AddTimestampSampleConsumer(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name); - ~AddTimestampSampleOutputStream() { - } + ~AddTimestampSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr underlying_; + unique_ptr underlying_; const SampleKey* timestamp_key; }; -class CsvSampleOutputStream : public SampleOutputStream { +class CsvWriterSampleConsumer : public SampleConsumer { public: - CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + CsvWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); - void write(SampleRecord const &sample); + void onSample(SampleRecord const &sample) override; private: void writeHeader(); @@ -158,33 +147,33 @@ private: bool headerWritten; }; -class JsonSampleOutputStream : public SampleOutputStream { +class JsonWriterSampleConsumer : public SampleConsumer { public: - JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + JsonWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; -class KeyValueSampleOutputStream : public SampleOutputStream { +class KeyValueWriterSampleConsumer : public SampleConsumer { public: - KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + KeyValueWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; -class RrdSampleOutputStream : public SampleOutputStream { +class RrdWriterSampleConsumer : public SampleConsumer { public: - RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); + RrdWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: vector keys; @@ -192,11 +181,11 @@ private: const SampleKey *timestamp_key; }; -class SqlSampleOutputStream : public SampleOutputStream { +class SqlWriterSampleConsumer : public SampleConsumer { public: - SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name); + SqlWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, string table_name); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; @@ -235,8 +224,8 @@ protected: class KeyValueSampleStreamParser : public SampleStreamParser { public: - KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV, dict), output(output), + KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)), line(make_shared>()) { } @@ -248,17 +237,17 @@ private: void process_line(shared_ptr> &packet); static const uint8_t packet_delimiter = '\n'; - shared_ptr output; + shared_ptr output; shared_ptr> line; }; class AutoSampleParser : public SampleStreamParser { public: - AutoSampleParser(shared_ptr output, KeyDictionary &dict); + AutoSampleParser(shared_ptr output, KeyDictionary &dict); - virtual int process(mutable_buffers_1 &buffer) override; + int process(mutable_buffers_1 &buffer) override; - virtual int finish() override; + int finish() override; private: unique_ptr parser; @@ -266,10 +255,10 @@ private: }; static inline -unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { - return make_unique(move(underlying)); +unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { + return make_unique(move(underlying)); }; -} -} -} +} // namespace io +} // namespace sensor +} // namespace trygvis diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 9e822c3..a0dcdc9 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -16,7 +16,7 @@ using boost::escaped_list_separator; using json = nlohmann::json; unique_ptr open_sample_stream_parser( - shared_ptr output, + shared_ptr output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { @@ -28,18 +28,18 @@ unique_ptr open_sample_stream_parser( } } -unique_ptr open_sample_output_stream( +unique_ptr open_sample_writer( shared_ptr output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options) { if (type == sample_format_type::CSV) { - return make_unique(output, dict); + return make_unique(output, dict); } else if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); + return make_unique(output, dict); } else if (type == sample_format_type::JSON) { - return make_unique(output, dict); + return make_unique(output, dict); } else if (type == sample_format_type::RRD) { auto of = options.find_option(); @@ -47,7 +47,7 @@ unique_ptr open_sample_output_stream( auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp"); - return make_unique(output, dict, timestamp_key, of); + return make_unique(output, dict, timestamp_key, of); } else if (type == sample_format_type::SQL) { auto tno = options.find_option(); @@ -55,30 +55,30 @@ unique_ptr open_sample_output_stream( throw missing_required_option_error("table name"); } - return make_unique(move(output), dict, tno.value()->name); + return make_unique(move(output), dict, tno.value()->name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) +ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr underlying) : underlying(move(underlying)) { } -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { +void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) { std::unique_lock lock(mutex); - underlying->write(sample); + underlying->onSample(sample); } -AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr underlying, +AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { } -void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { +void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) { if (sample.at(timestamp_key)) { - underlying_->write(sample); + underlying_->onSample(sample); return; } @@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { SampleRecord copy = sample; copy.set(timestamp_key, timestamp_s); - underlying_->write(copy); + underlying_->onSample(copy); } -void VectorSampleOutputStream::write(SampleRecord const &sample) { +void VectorSampleOutputStream::onSample(SampleRecord const &sample) { if (sample.empty()) { return; } @@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) { samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict) +CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } -void CsvSampleOutputStream::write(SampleRecord const &sample) { +void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -void CsvSampleOutputStream::writeHeader() { +void CsvWriterSampleConsumer::writeHeader() { auto &s = *stream; auto i = dict.begin(); @@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() { s << endl << flush; } -JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : +JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void JsonSampleOutputStream::write(SampleRecord const &sample) { +void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) { *stream.get() << doc << endl << flush; } -KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : +KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void KeyValueSampleOutputStream::write(SampleRecord const &sample) { +void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { *s << endl << flush; } -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, +RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields) : @@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, } } -void RrdSampleOutputStream::write(SampleRecord const &sample) { +void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : +SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &sample) { +void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) { string fs, vs; fs.reserve(1024); @@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr> &packe sample.set(key, value); } - output->write(sample); + output->onSample(sample); } -AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : +AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); -- cgit v1.2.3