diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2015-03-22 18:12:48 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2015-03-22 18:12:48 +0100 |
commit | 6bac290b92b635be047237b880144dbc163df6ec (patch) | |
tree | ab2a00f15d1c698ae37966c494000219a0a74e2f /sensor/include/trygvis/sensor | |
parent | d7d545575250f616f0b9e2243e08544ab2794a03 (diff) | |
download | ble-toys-6bac290b92b635be047237b880144dbc163df6ec.tar.gz ble-toys-6bac290b92b635be047237b880144dbc163df6ec.tar.bz2 ble-toys-6bac290b92b635be047237b880144dbc163df6ec.tar.xz ble-toys-6bac290b92b635be047237b880144dbc163df6ec.zip |
o Splitting out io parts into trygvis::sensor::io.
Diffstat (limited to 'sensor/include/trygvis/sensor')
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h new file mode 100644 index 0000000..7db7615 --- /dev/null +++ b/sensor/include/trygvis/sensor/io.h @@ -0,0 +1,210 @@ +#pragma once + +#include "trygvis/sensor.h" + +#include <mutex> +#include "boost/asio/buffer.hpp" + +namespace trygvis { +namespace sensor { +namespace io { + +using namespace std; +using namespace boost::asio; + +class output_fields; + +class timestamp_field; + +class SampleOutputStream { +public: + virtual void write(SampleRecord const &sample) = 0; +}; + +class VectorSampleOutputStream : public SampleOutputStream { + +public: + virtual void write(SampleRecord const &sample) override; + +public: + vector<SampleRecord> samples; +}; + +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr<SampleOutputStream> underlying; + std::mutex mutex; +}; + +class CsvSampleOutputStream : public SampleOutputStream { +public: + CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample); + + const KeyDictionary &getDict() { + return dict; + } + +private: + void writeHeader(); + + KeyDictionary &dict; + shared_ptr<ostream> stream; + bool headerWritten; +}; + +class JsonSampleOutputStream : public SampleOutputStream { +public: + JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class KeyValueSampleOutputStream : public SampleOutputStream { +public: + KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class RrdSampleOutputStream : public SampleOutputStream { +public: + RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields); + + void write(SampleRecord const &sample) override; + +private: + vector<SampleKey *> keys; + shared_ptr<ostream> stream; + const SampleKey *timestamp_key; +}; + +class SqlSampleOutputStream : public SampleOutputStream { +public: + SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; + const string table_name; +}; + +class SampleStreamParser { +public: + // TODO: return number of samples found for progress indication? + virtual void process(mutable_buffers_1 buffer) = 0; + + virtual sample_format_type type() { + return type_; + } + +protected: + sample_format_type type_; + + SampleStreamParser(const sample_format_type type) : type_(type) { + } +}; + +class KeyValueSampleStreamParser : public SampleStreamParser { + +public: + KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), + line(make_shared<vector<uint8_t>>()) { + } + + void process(mutable_buffers_1 buffer) override; + +private: + void process_line(shared_ptr<vector<uint8_t>> packet); + + static const uint8_t packet_delimiter = '\n'; + KeyDictionary &dict; + shared_ptr<SampleOutputStream> output; + shared_ptr<vector<uint8_t>> line; +}; + +class AutoSampleParser : public SampleStreamParser { +public: + AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); + +private: + unique_ptr<SampleStreamParser> parser; + unique_ptr<KeyValueSampleStreamParser> keyValueParser; +public: + virtual void process(mutable_buffers_1 buffer); +}; + +class sample_output_stream_option { +public: + virtual ~sample_output_stream_option() { + }; +}; + +class output_fields : public sample_output_stream_option { +public: + ~output_fields() { + } + + vector<string> fields; +}; + +class timestamp_field : public sample_output_stream_option { +public: + timestamp_field(string name) : name(name) { + } + + ~timestamp_field() { + } + + string name; +}; + +// TODO: rename to open_sample_stream_parser +unique_ptr<SampleStreamParser> open_sample_input_stream( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, +vector<sample_output_stream_option *> options); + +static inline +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type) { +return open_sample_output_stream(output, dict, type); +} + +static inline +unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { + return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +}; + + +} +} +} |