#pragma once #include "trygvis/sensor.h" #include #include #include namespace trygvis { namespace sensor { namespace io { using namespace std; using namespace boost::asio; struct sample_output_stream_option { virtual ~sample_output_stream_option() = default;; }; struct output_fields_option : sample_output_stream_option { ~output_fields_option() override = default; vector fields; }; struct timestamp_field_option : sample_output_stream_option { explicit timestamp_field_option(string name) : name(std::move(name)) {} ~timestamp_field_option() override = default; const string name; }; class table_name_option : public sample_output_stream_option { public: explicit table_name_option(string name) : name(std::move(name)) {} ~table_name_option() override = default; const string name; }; class sample_output_stream_options : public vector { public: ~sample_output_stream_options() = default; template o find_option() const { for (auto it : *this) { auto *x = dynamic_cast(it); if (x != nullptr) { return o(x); } } return o(); } }; struct missing_required_option_error : runtime_error { explicit missing_required_option_error(const string &what) : runtime_error(what) {} ~missing_required_option_error() override = default; }; class SampleStreamParser; class SampleConsumer; /** * Throws missing_required_option_error */ unique_ptr open_sample_stream_parser( shared_ptr output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); /** * Throws missing_required_option_error */ unique_ptr open_sample_writer( shared_ptr output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options); static inline unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { sample_output_stream_options options; return open_sample_writer(std::move(output), dict, type, options); } class SampleConsumer { public: virtual void onSample(SampleRecord const &sample) = 0; }; class VectorSampleOutputStream : public SampleConsumer { public: void onSample(SampleRecord const &sample) override; vector samples; }; class ThreadSafeSampleConsumer : public SampleConsumer { public: explicit ThreadSafeSampleConsumer(unique_ptr underlying); ~ThreadSafeSampleConsumer() = default; void onSample(SampleRecord const &sample) override; private: unique_ptr underlying; std::mutex mutex; }; class AddTimestampSampleConsumer : public SampleConsumer { public: AddTimestampSampleConsumer(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name); ~AddTimestampSampleConsumer() = default; void onSample(SampleRecord const &sample) override; private: unique_ptr underlying_; const SampleKey* timestamp_key; }; class CsvWriterSampleConsumer : public SampleConsumer { public: CsvWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); void onSample(SampleRecord const &sample) override; private: void writeHeader(); KeyDictionary &dict; shared_ptr stream; bool headerWritten; }; class JsonWriterSampleConsumer : public SampleConsumer { public: JsonWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class KeyValueWriterSampleConsumer : public SampleConsumer { public: KeyValueWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict); void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class RrdWriterSampleConsumer : public SampleConsumer { public: RrdWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); void onSample(SampleRecord const &sample) override; private: vector keys; shared_ptr stream; const SampleKey *timestamp_key; }; class SqlWriterSampleConsumer : public SampleConsumer { public: SqlWriterSampleConsumer(shared_ptr stream, KeyDictionary &dict, string table_name); void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; const string table_name; }; class SampleStreamParser { public: virtual int process(mutable_buffers_1 &buffer) = 0; /** * Tells the parser that the data that has been written so far either "is it" (there is no more input) or that * it should just process what it has so far. It is ok to call process() after again after this method. * * TODO: should probably be renamed to "process()", "flush()" or "done()". */ virtual int finish() = 0; virtual sample_format_type type() { return type_; } virtual KeyDictionary &dict() { return dict_; } protected: sample_format_type type_; KeyDictionary &dict_; SampleStreamParser(const sample_format_type type, KeyDictionary &dict) : type_(type), dict_(dict) { } }; class KeyValueSampleStreamParser : public SampleStreamParser { public: KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)), line(make_shared>()) { } int process(mutable_buffers_1 &buffer) override; int finish() override; private: void process_line(shared_ptr> &packet); static const uint8_t packet_delimiter = '\n'; shared_ptr output; shared_ptr> line; }; class AutoSampleParser : public SampleStreamParser { public: AutoSampleParser(shared_ptr output, KeyDictionary &dict); int process(mutable_buffers_1 &buffer) override; int finish() override; private: unique_ptr parser; unique_ptr keyValueParser; }; static inline unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { return make_unique(move(underlying)); }; } // namespace io } // namespace sensor } // namespace trygvis