#pragma once #include "trygvis/sensor.h" #include #include namespace trygvis { namespace sensor { namespace io { using namespace std; using namespace boost::asio; struct sample_output_stream_option { virtual ~sample_output_stream_option() { }; }; struct output_fields_option : sample_output_stream_option { ~output_fields_option() { } vector fields; }; struct timestamp_field_option : sample_output_stream_option { timestamp_field_option(const string name) : name(name) { } ~timestamp_field_option() { } const string name; }; class table_name_option : public sample_output_stream_option { public: table_name_option(const string name) : name(name) { } ~table_name_option() { } const string name; }; class sample_output_stream_options : public vector { public: ~sample_output_stream_options() { } template o find_option() const { for (auto it = begin(); it != end(); ++it) { T *x = dynamic_cast(*it); if (x != nullptr) { return o(x); } } return o(); } }; struct missing_required_option_error : runtime_error { missing_required_option_error(string what) : runtime_error(what) { } ~missing_required_option_error() { } }; class SampleStreamParser; class SampleOutputStream; /** * Throws missing_required_option_error */ unique_ptr open_sample_stream_parser( shared_ptr output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); /** * Throws missing_required_option_error */ unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options); static inline unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { sample_output_stream_options options; return open_sample_output_stream(output, dict, type, options); } class SampleOutputStream { public: virtual void write(SampleRecord const &sample) = 0; }; class VectorSampleOutputStream : public SampleOutputStream { public: virtual void write(SampleRecord const &sample) override; public: vector samples; }; class ThreadSafeSampleOutputStream : public SampleOutputStream { public: ThreadSafeSampleOutputStream(unique_ptr underlying); ~ThreadSafeSampleOutputStream() { } void write(SampleRecord const &sample) override; private: unique_ptr underlying; std::mutex mutex; }; class AddTimestampSampleOutputStream : public SampleOutputStream { public: AddTimestampSampleOutputStream(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name); ~AddTimestampSampleOutputStream() { } void write(SampleRecord const &sample) override; private: unique_ptr underlying_; const SampleKey* timestamp_key; }; class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample); private: void writeHeader(); KeyDictionary &dict; shared_ptr stream; bool headerWritten; }; class JsonSampleOutputStream : public SampleOutputStream { public: JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class KeyValueSampleOutputStream : public SampleOutputStream { public: KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class RrdSampleOutputStream : public SampleOutputStream { public: RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); void write(SampleRecord const &sample) override; private: vector keys; shared_ptr stream; const SampleKey *timestamp_key; }; class SqlSampleOutputStream : public SampleOutputStream { public: SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; const string table_name; }; class SampleStreamParser { public: virtual int process(mutable_buffers_1 &buffer) = 0; virtual int finish() = 0; virtual sample_format_type type() { return type_; } virtual KeyDictionary &dict() { return dict_; } protected: sample_format_type type_; KeyDictionary &dict_; SampleStreamParser(const sample_format_type type, KeyDictionary &dict) : type_(type), dict_(dict) { } }; class KeyValueSampleStreamParser : public SampleStreamParser { public: KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::CSV, dict), output(output), line(make_shared>()) { } int process(mutable_buffers_1 &buffer) override; int finish() override; private: void process_line(shared_ptr> &packet); static const uint8_t packet_delimiter = '\n'; shared_ptr output; shared_ptr> line; }; class AutoSampleParser : public SampleStreamParser { public: AutoSampleParser(shared_ptr output, KeyDictionary &dict); virtual int process(mutable_buffers_1 &buffer) override; virtual int finish() override; private: unique_ptr parser; unique_ptr keyValueParser; }; static inline unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { return make_unique(move(underlying)); }; } } }