aboutsummaryrefslogtreecommitdiff
path: root/sensor/include/trygvis
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/include/trygvis')
-rw-r--r--sensor/include/trygvis/SensorSample.h423
1 files changed, 423 insertions, 0 deletions
diff --git a/sensor/include/trygvis/SensorSample.h b/sensor/include/trygvis/SensorSample.h
new file mode 100644
index 0000000..438e2ae
--- /dev/null
+++ b/sensor/include/trygvis/SensorSample.h
@@ -0,0 +1,423 @@
+#pragma once
+
+#include <ostream>
+#include <vector>
+#include <map>
+#include <map>
+#include <memory>
+#include <boost/asio/buffer.hpp>
+#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
+#include <functional>
+#include <mutex>
+
+namespace trygvis {
+namespace sensor {
+
+using namespace std;
+using namespace boost::asio;
+
+template<typename A>
+using o = boost::optional<A>;
+
+enum class sample_format_type {
+ AUTO,
+ CSV,
+ KEY_VALUE,
+ JSON,
+ SQL,
+ RRD,
+};
+
+string to_string(const sample_format_type &arg);
+
+std::ostream& operator<<(std::ostream& os, sample_format_type const& type);
+
+std::istream& operator>>(std::istream& is, sample_format_type& type);
+
+class SampleStreamParser;
+
+class SampleOutputStream;
+
+class KeyDictionary;
+
+class SampleKey;
+
+// TODO: rename to open_sample_stream_parser
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type = sample_format_type::AUTO);
+
+class sample_output_stream_option {
+public:
+ virtual ~sample_output_stream_option() {
+ };
+};
+
+class output_fields : public sample_output_stream_option {
+public:
+// output_fields() {
+// }
+//
+// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) :
+// fields(begin, end) {
+// }
+
+ ~output_fields() {
+ }
+
+ vector<string> fields;
+};
+
+
+class timestamp_field : public sample_output_stream_option {
+public:
+ timestamp_field(string name) : name(name) {
+ }
+
+ ~timestamp_field() {
+ }
+
+ string name;
+};
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ vector<sample_output_stream_option *> options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ return open_sample_output_stream(output, dict, type);
+}
+
+class ThreadSafeSampleOutputStream;
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
+class sample_exception : public runtime_error {
+public:
+ sample_exception(const string &what) : runtime_error(what) {
+ }
+};
+
+class KeyDictionary;
+
+using SampleKeyVector = vector<SampleKey *>;
+using SampleKeyIndex = SampleKeyVector::size_type;
+
+struct SampleKey {
+private:
+ SampleKey(const SampleKey& that) = delete;
+ SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) {
+ if (name.length() == 0) {
+ throw sample_exception("Bad sample key.");
+ }
+ }
+
+public:
+ friend class KeyDictionary;
+
+ inline
+ bool operator==(const SampleKey &that) const {
+ return name == that.name;
+ }
+
+ const SampleKeyIndex index;
+ const string name;
+};
+
+class KeyDictionary {
+public:
+ KeyDictionary() {
+ }
+
+ ~KeyDictionary() {
+ std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>());
+ }
+ KeyDictionary(KeyDictionary& that) = delete;
+
+ SampleKey *indexOf(const string key) {
+ SampleKeyIndex i = 0;
+ for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) {
+ if ((*ptr)->name == key) {
+ return *ptr;
+ }
+ }
+
+ i = keys.size();
+ auto sample_key = new SampleKey(i, key);
+ keys.push_back(sample_key);
+
+ return sample_key;
+ }
+
+ SampleKey *at(SampleKeyIndex i) const {
+ if (i >= keys.size()) {
+ throw sample_exception("Out of bounds");
+ }
+
+ return keys.at(i);
+ }
+
+ vector<SampleKey *> findIndexes(SampleKeyVector &keys) {
+ vector<SampleKey *> indexes;
+
+ for (auto &key: keys) {
+ auto index = indexOf(key->name);
+ indexes.push_back(index);
+ }
+
+ return indexes;
+ }
+
+ inline
+ SampleKeyVector::const_iterator end() const {
+ return keys.cend();
+ }
+
+ inline
+ SampleKeyVector::const_iterator begin() const {
+ return keys.cbegin();
+ }
+
+// string nameOf(SampleKeyIndex index) {
+// return keys.at(index).name;
+// }
+
+ inline
+ SampleKeyVector::size_type size() const {
+ return keys.size();
+ }
+
+ inline
+ bool empty() const {
+ return keys.empty();
+ }
+
+private:
+ SampleKeyVector keys;
+};
+
+class SampleRecord {
+public:
+ typedef vector<o<string>> vec;
+
+ SampleRecord(KeyDictionary &dict) : dict(dict) {
+ }
+
+ SampleRecord(KeyDictionary &dict, vec values)
+ : dict(dict), values(values) {
+ }
+
+ inline
+ vec::const_iterator cbegin() const {
+ return values.cbegin();
+ }
+
+ inline
+ vec::const_iterator cend() const {
+ return values.cend();
+ }
+
+ inline
+ bool empty() const {
+ return values.empty();
+ }
+
+ const o<string> at(const SampleKey *key) const {
+ SampleKeyIndex index = key->index;
+ if (index >= values.size()) {
+ return o<string>();
+ }
+
+ return values.at(index);
+ }
+
+ void set(const SampleKey *key, const std::string &value) {
+ values.resize(max(values.size(), key->index + 1));
+
+ values.at(key->index).reset(value);
+ }
+
+ template<class A>
+ const o<A> lexical_at(const SampleKey *key) const {
+ auto value = at(key);
+
+ if (!value) {
+ return o<A>();
+ }
+
+ return o<A>(boost::lexical_cast<A>(value.get()));
+ }
+
+ string to_string() const {
+ SampleKeyIndex i = 0;
+ string s;
+ for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
+ auto o = *ptr;
+
+ if (!o) {
+ continue;
+ }
+
+ auto value = o.get();
+
+ s += dict.at(i)->name + " = " + value + ", ";
+ }
+ return s;
+ }
+
+ KeyDictionary &dict;
+private:
+ vec values;
+};
+
+class SampleOutputStream {
+public:
+ virtual void write(SampleRecord const &sample) = 0;
+};
+
+class VectorSampleOutputStream : public SampleOutputStream {
+
+public:
+ virtual void write(SampleRecord const &sample) override;
+
+public:
+ vector<SampleRecord> samples;
+};
+
+class ThreadSafeSampleOutputStream : public SampleOutputStream {
+public:
+ ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+
+ ~ThreadSafeSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying;
+ std::mutex mutex;
+};
+
+class CsvSampleOutputStream : public SampleOutputStream {
+public:
+ CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample);
+
+ const KeyDictionary &getDict() {
+ return dict;
+ }
+
+private:
+ void writeHeader();
+
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ bool headerWritten;
+};
+
+class JsonSampleOutputStream : public SampleOutputStream {
+public:
+ JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class KeyValueSampleOutputStream : public SampleOutputStream {
+public:
+ KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+};
+
+class RrdSampleOutputStream : public SampleOutputStream {
+public:
+ RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ vector<SampleKey *> keys;
+ shared_ptr<ostream> stream;
+ const SampleKey *timestamp_key;
+};
+
+class SqlSampleOutputStream : public SampleOutputStream {
+public:
+ SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ KeyDictionary &dict;
+ shared_ptr<ostream> stream;
+ const string table_name;
+};
+
+class SampleStreamParser {
+public:
+ // TODO: return number of samples found for progress indication?
+ virtual void process(mutable_buffers_1 buffer) = 0;
+
+ virtual sample_format_type type() {
+ return type_;
+ }
+
+protected:
+ sample_format_type type_;
+
+ SampleStreamParser(const sample_format_type type) : type_(type) {
+ }
+};
+
+class KeyValueSampleStreamParser : public SampleStreamParser {
+
+public:
+ KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
+ line(make_shared<vector<uint8_t>>()) {
+ }
+
+ void process(mutable_buffers_1 buffer) override;
+
+private:
+ void process_line(shared_ptr<vector<uint8_t>> packet);
+
+ static const uint8_t packet_delimiter = '\n';
+ KeyDictionary &dict;
+ shared_ptr<SampleOutputStream> output;
+ shared_ptr<vector<uint8_t>> line;
+};
+
+class AutoSampleParser : public SampleStreamParser {
+public:
+ AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+
+private:
+ unique_ptr<SampleStreamParser> parser;
+ unique_ptr<KeyValueSampleStreamParser> keyValueParser;
+public:
+ virtual void process(mutable_buffers_1 buffer);
+};
+
+}
+}