aboutsummaryrefslogtreecommitdiff
path: root/sensor/include/trygvis/sensor/io.h
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/include/trygvis/sensor/io.h')
-rw-r--r--sensor/include/trygvis/sensor/io.h210
1 files changed, 210 insertions, 0 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
new file mode 100644
index 0000000..7db7615
--- /dev/null
+++ b/sensor/include/trygvis/sensor/io.h
@@ -0,0 +1,210 @@
+#pragma once
+
+#include "trygvis/sensor.h"
+
+#include <mutex>
+#include "boost/asio/buffer.hpp"
+
+namespace trygvis {
+namespace sensor {
+namespace io {
+
+using namespace std;
+using namespace boost::asio;
+
+class output_fields;
+
+class timestamp_field;
+
+class SampleOutputStream {
+public:
+ virtual void write(SampleRecord const &sample) = 0;
+};
+
+class VectorSampleOutputStream : public SampleOutputStream {
+
+public:
+ virtual void write(SampleRecord const &sample) override;
+
+public:
+ vector<SampleRecord> samples;
+};
+
+class ThreadSafeSampleOutputStream : public SampleOutputStream {
+public:
+ ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+
+ ~ThreadSafeSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying;
+ std::mutex mutex;
+};
+
+class CsvSampleOutputStream : public SampleOutputStream {
+public:
+ CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample);
+
+ const KeyDictionary &getDict() {
+ return dict;
+ }
+
+private:
+ void writeHeader();
+
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ bool headerWritten;
+};
+
+class JsonSampleOutputStream : public SampleOutputStream {
+public:
+ JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class KeyValueSampleOutputStream : public SampleOutputStream {
+public:
+ KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class RrdSampleOutputStream : public SampleOutputStream {
+public:
+ RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ vector<SampleKey *> keys;
+ shared_ptr<ostream> stream;
+ const SampleKey *timestamp_key;
+};
+
+class SqlSampleOutputStream : public SampleOutputStream {
+public:
+ SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ const string table_name;
+};
+
+class SampleStreamParser {
+public:
+ // TODO: return number of samples found for progress indication?
+ virtual void process(mutable_buffers_1 buffer) = 0;
+
+ virtual sample_format_type type() {
+ return type_;
+ }
+
+protected:
+ sample_format_type type_;
+
+ SampleStreamParser(const sample_format_type type) : type_(type) {
+ }
+};
+
+class KeyValueSampleStreamParser : public SampleStreamParser {
+
+public:
+ KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
+ line(make_shared<vector<uint8_t>>()) {
+ }
+
+ void process(mutable_buffers_1 buffer) override;
+
+private:
+ void process_line(shared_ptr<vector<uint8_t>> packet);
+
+ static const uint8_t packet_delimiter = '\n';
+ KeyDictionary &dict;
+ shared_ptr<SampleOutputStream> output;
+ shared_ptr<vector<uint8_t>> line;
+};
+
+class AutoSampleParser : public SampleStreamParser {
+public:
+ AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+
+private:
+ unique_ptr<SampleStreamParser> parser;
+ unique_ptr<KeyValueSampleStreamParser> keyValueParser;
+public:
+ virtual void process(mutable_buffers_1 buffer);
+};
+
+class sample_output_stream_option {
+public:
+ virtual ~sample_output_stream_option() {
+ };
+};
+
+class output_fields : public sample_output_stream_option {
+public:
+ ~output_fields() {
+ }
+
+ vector<string> fields;
+};
+
+class timestamp_field : public sample_output_stream_option {
+public:
+ timestamp_field(string name) : name(name) {
+ }
+
+ ~timestamp_field() {
+ }
+
+ string name;
+};
+
+// TODO: rename to open_sample_stream_parser
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type = sample_format_type::AUTO);
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+vector<sample_output_stream_option *> options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+return open_sample_output_stream(output, dict, type);
+}
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
+
+}
+}
+}