diff options
Diffstat (limited to 'sensor/include/trygvis/sensor/io.h')
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h new file mode 100644 index 0000000..7db7615 --- /dev/null +++ b/sensor/include/trygvis/sensor/io.h @@ -0,0 +1,210 @@ +#pragma once + +#include "trygvis/sensor.h" + +#include <mutex> +#include "boost/asio/buffer.hpp" + +namespace trygvis { +namespace sensor { +namespace io { + +using namespace std; +using namespace boost::asio; + +class output_fields; + +class timestamp_field; + +class SampleOutputStream { +public: + virtual void write(SampleRecord const &sample) = 0; +}; + +class VectorSampleOutputStream : public SampleOutputStream { + +public: + virtual void write(SampleRecord const &sample) override; + +public: + vector<SampleRecord> samples; +}; + +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr<SampleOutputStream> underlying; + std::mutex mutex; +}; + +class CsvSampleOutputStream : public SampleOutputStream { +public: + CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample); + + const KeyDictionary &getDict() { + return dict; + } + +private: + void writeHeader(); + + KeyDictionary &dict; + shared_ptr<ostream> stream; + bool headerWritten; +}; + +class JsonSampleOutputStream : public SampleOutputStream { +public: + JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class KeyValueSampleOutputStream : public SampleOutputStream { +public: + KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class RrdSampleOutputStream : public SampleOutputStream { +public: + RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields); + + void write(SampleRecord const &sample) override; + +private: + vector<SampleKey *> keys; + shared_ptr<ostream> stream; + const SampleKey *timestamp_key; +}; + +class SqlSampleOutputStream : public SampleOutputStream { +public: + SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; + const string table_name; +}; + +class SampleStreamParser { +public: + // TODO: return number of samples found for progress indication? + virtual void process(mutable_buffers_1 buffer) = 0; + + virtual sample_format_type type() { + return type_; + } + +protected: + sample_format_type type_; + + SampleStreamParser(const sample_format_type type) : type_(type) { + } +}; + +class KeyValueSampleStreamParser : public SampleStreamParser { + +public: + KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), + line(make_shared<vector<uint8_t>>()) { + } + + void process(mutable_buffers_1 buffer) override; + +private: + void process_line(shared_ptr<vector<uint8_t>> packet); + + static const uint8_t packet_delimiter = '\n'; + KeyDictionary &dict; + shared_ptr<SampleOutputStream> output; + shared_ptr<vector<uint8_t>> line; +}; + +class AutoSampleParser : public SampleStreamParser { +public: + AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); + +private: + unique_ptr<SampleStreamParser> parser; + unique_ptr<KeyValueSampleStreamParser> keyValueParser; +public: + virtual void process(mutable_buffers_1 buffer); +}; + +class sample_output_stream_option { +public: + virtual ~sample_output_stream_option() { + }; +}; + +class output_fields : public sample_output_stream_option { +public: + ~output_fields() { + } + + vector<string> fields; +}; + +class timestamp_field : public sample_output_stream_option { +public: + timestamp_field(string name) : name(name) { + } + + ~timestamp_field() { + } + + string name; +}; + +// TODO: rename to open_sample_stream_parser +unique_ptr<SampleStreamParser> open_sample_input_stream( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, +vector<sample_output_stream_option *> options); + +static inline +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type) { +return open_sample_output_stream(output, dict, type); +} + +static inline +unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { + return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +}; + + +} +} +} |