diff options
Diffstat (limited to 'sensor/include/trygvis')
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 125 |
1 files changed, 57 insertions, 68 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index 304091b..403af9b 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -4,6 +4,7 @@ #include <mutex> #include <boost/asio/buffer.hpp> +#include <utility> namespace trygvis { namespace sensor { @@ -13,47 +14,40 @@ using namespace std; using namespace boost::asio; struct sample_output_stream_option { - virtual ~sample_output_stream_option() { - }; + virtual ~sample_output_stream_option() = default;; }; struct output_fields_option : sample_output_stream_option { - ~output_fields_option() { - } + ~output_fields_option() override = default; vector<string> fields; }; struct timestamp_field_option : sample_output_stream_option { - timestamp_field_option(const string name) : name(name) { - } + explicit timestamp_field_option(string name) : name(std::move(name)) {} - ~timestamp_field_option() { - } + ~timestamp_field_option() override = default; const string name; }; class table_name_option : public sample_output_stream_option { public: - table_name_option(const string name) : name(name) { - } + explicit table_name_option(string name) : name(std::move(name)) {} - ~table_name_option() { - } + ~table_name_option() override = default; const string name; }; class sample_output_stream_options : public vector<sample_output_stream_option *> { public: - ~sample_output_stream_options() { - } + ~sample_output_stream_options() = default; template<typename T> o<T *> find_option() const { - for (auto it = begin(); it != end(); ++it) { - T *x = dynamic_cast<T *>(*it); + for (auto it : *this) { + auto *x = dynamic_cast<T *>(it); if (x != nullptr) { return o<T *>(x); @@ -65,90 +59,85 @@ public: }; struct missing_required_option_error : runtime_error { - missing_required_option_error(string what) : runtime_error(what) { - } + explicit missing_required_option_error(const string &what) : runtime_error(what) {} - ~missing_required_option_error() { - } + ~missing_required_option_error() override = default; }; class SampleStreamParser; -class SampleOutputStream; +class SampleConsumer; /** * Throws missing_required_option_error */ unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, + shared_ptr<SampleConsumer> output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); /** * Throws missing_required_option_error */ -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_writer( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options); static inline -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_output_stream( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type) { sample_output_stream_options options; - return open_sample_output_stream(output, dict, type, options); + return open_sample_writer(std::move(output), dict, type, options); } -class SampleOutputStream { +class SampleConsumer { public: - virtual void write(SampleRecord const &sample) = 0; + virtual void onSample(SampleRecord const &sample) = 0; }; -class VectorSampleOutputStream : public SampleOutputStream { +class VectorSampleOutputStream : public SampleConsumer { public: - virtual void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; -public: vector<SampleRecord> samples; }; -class ThreadSafeSampleOutputStream : public SampleOutputStream { +class ThreadSafeSampleConsumer : public SampleConsumer { public: - ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + explicit ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying); - ~ThreadSafeSampleOutputStream() { - } + ~ThreadSafeSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr<SampleOutputStream> underlying; + unique_ptr<SampleConsumer> underlying; std::mutex mutex; }; -class AddTimestampSampleOutputStream : public SampleOutputStream { +class AddTimestampSampleConsumer : public SampleConsumer { public: - AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string ×tamp_name); + AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string ×tamp_name); - ~AddTimestampSampleOutputStream() { - } + ~AddTimestampSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr<SampleOutputStream> underlying_; + unique_ptr<SampleConsumer> underlying_; const SampleKey* timestamp_key; }; -class CsvSampleOutputStream : public SampleOutputStream { +class CsvWriterSampleConsumer : public SampleConsumer { public: - CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample); + void onSample(SampleRecord const &sample) override; private: void writeHeader(); @@ -158,33 +147,33 @@ private: bool headerWritten; }; -class JsonSampleOutputStream : public SampleOutputStream { +class JsonWriterSampleConsumer : public SampleConsumer { public: - JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr<ostream> stream; }; -class KeyValueSampleOutputStream : public SampleOutputStream { +class KeyValueWriterSampleConsumer : public SampleConsumer { public: - KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr<ostream> stream; }; -class RrdSampleOutputStream : public SampleOutputStream { +class RrdWriterSampleConsumer : public SampleConsumer { public: - RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields); + RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: vector<const SampleKey *> keys; @@ -192,11 +181,11 @@ private: const SampleKey *timestamp_key; }; -class SqlSampleOutputStream : public SampleOutputStream { +class SqlWriterSampleConsumer : public SampleConsumer { public: - SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); + SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; @@ -235,8 +224,8 @@ protected: class KeyValueSampleStreamParser : public SampleStreamParser { public: - KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV, dict), output(output), + KeyValueSampleStreamParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)), line(make_shared<vector<uint8_t>>()) { } @@ -248,17 +237,17 @@ private: void process_line(shared_ptr<vector<uint8_t>> &packet); static const uint8_t packet_delimiter = '\n'; - shared_ptr<SampleOutputStream> output; + shared_ptr<SampleConsumer> output; shared_ptr<vector<uint8_t>> line; }; class AutoSampleParser : public SampleStreamParser { public: - AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); + AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict); - virtual int process(mutable_buffers_1 &buffer) override; + int process(mutable_buffers_1 &buffer) override; - virtual int finish() override; + int finish() override; private: unique_ptr<SampleStreamParser> parser; @@ -266,10 +255,10 @@ private: }; static inline -unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { - return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +unique_ptr<ThreadSafeSampleConsumer> thread_safe_sample_output_stream(unique_ptr<SampleConsumer> underlying) { + return make_unique<ThreadSafeSampleConsumer>(move(underlying)); }; -} -} -} +} // namespace io +} // namespace sensor +} // namespace trygvis |