diff options
Diffstat (limited to 'sensor/include/trygvis')
-rw-r--r-- | sensor/include/trygvis/SensorSample.h | 423 |
1 files changed, 423 insertions, 0 deletions
diff --git a/sensor/include/trygvis/SensorSample.h b/sensor/include/trygvis/SensorSample.h new file mode 100644 index 0000000..438e2ae --- /dev/null +++ b/sensor/include/trygvis/SensorSample.h @@ -0,0 +1,423 @@ +#pragma once + +#include <ostream> +#include <vector> +#include <map> +#include <map> +#include <memory> +#include <boost/asio/buffer.hpp> +#include <boost/optional.hpp> +#include <boost/lexical_cast.hpp> +#include <functional> +#include <mutex> + +namespace trygvis { +namespace sensor { + +using namespace std; +using namespace boost::asio; + +template<typename A> +using o = boost::optional<A>; + +enum class sample_format_type { + AUTO, + CSV, + KEY_VALUE, + JSON, + SQL, + RRD, +}; + +string to_string(const sample_format_type &arg); + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type); + +std::istream& operator>>(std::istream& is, sample_format_type& type); + +class SampleStreamParser; + +class SampleOutputStream; + +class KeyDictionary; + +class SampleKey; + +// TODO: rename to open_sample_stream_parser +unique_ptr<SampleStreamParser> open_sample_input_stream( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +class sample_output_stream_option { +public: + virtual ~sample_output_stream_option() { + }; +}; + +class output_fields : public sample_output_stream_option { +public: +// output_fields() { +// } +// +// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) : +// fields(begin, end) { +// } + + ~output_fields() { + } + + vector<string> fields; +}; + + +class timestamp_field : public sample_output_stream_option { +public: + timestamp_field(string name) : name(name) { + } + + ~timestamp_field() { + } + + string name; +}; + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, + vector<sample_output_stream_option *> options); + +static inline +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type) { + return open_sample_output_stream(output, dict, type); +} + +class ThreadSafeSampleOutputStream; + +static inline +unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { + return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +}; + +class sample_exception : public runtime_error { +public: + sample_exception(const string &what) : runtime_error(what) { + } +}; + +class KeyDictionary; + +using SampleKeyVector = vector<SampleKey *>; +using SampleKeyIndex = SampleKeyVector::size_type; + +struct SampleKey { +private: + SampleKey(const SampleKey& that) = delete; + SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { + if (name.length() == 0) { + throw sample_exception("Bad sample key."); + } + } + +public: + friend class KeyDictionary; + + inline + bool operator==(const SampleKey &that) const { + return name == that.name; + } + + const SampleKeyIndex index; + const string name; +}; + +class KeyDictionary { +public: + KeyDictionary() { + } + + ~KeyDictionary() { + std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>()); + } + KeyDictionary(KeyDictionary& that) = delete; + + SampleKey *indexOf(const string key) { + SampleKeyIndex i = 0; + for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { + if ((*ptr)->name == key) { + return *ptr; + } + } + + i = keys.size(); + auto sample_key = new SampleKey(i, key); + keys.push_back(sample_key); + + return sample_key; + } + + SampleKey *at(SampleKeyIndex i) const { + if (i >= keys.size()) { + throw sample_exception("Out of bounds"); + } + + return keys.at(i); + } + + vector<SampleKey *> findIndexes(SampleKeyVector &keys) { + vector<SampleKey *> indexes; + + for (auto &key: keys) { + auto index = indexOf(key->name); + indexes.push_back(index); + } + + return indexes; + } + + inline + SampleKeyVector::const_iterator end() const { + return keys.cend(); + } + + inline + SampleKeyVector::const_iterator begin() const { + return keys.cbegin(); + } + +// string nameOf(SampleKeyIndex index) { +// return keys.at(index).name; +// } + + inline + SampleKeyVector::size_type size() const { + return keys.size(); + } + + inline + bool empty() const { + return keys.empty(); + } + +private: + SampleKeyVector keys; +}; + +class SampleRecord { +public: + typedef vector<o<string>> vec; + + SampleRecord(KeyDictionary &dict) : dict(dict) { + } + + SampleRecord(KeyDictionary &dict, vec values) + : dict(dict), values(values) { + } + + inline + vec::const_iterator cbegin() const { + return values.cbegin(); + } + + inline + vec::const_iterator cend() const { + return values.cend(); + } + + inline + bool empty() const { + return values.empty(); + } + + const o<string> at(const SampleKey *key) const { + SampleKeyIndex index = key->index; + if (index >= values.size()) { + return o<string>(); + } + + return values.at(index); + } + + void set(const SampleKey *key, const std::string &value) { + values.resize(max(values.size(), key->index + 1)); + + values.at(key->index).reset(value); + } + + template<class A> + const o<A> lexical_at(const SampleKey *key) const { + auto value = at(key); + + if (!value) { + return o<A>(); + } + + return o<A>(boost::lexical_cast<A>(value.get())); + } + + string to_string() const { + SampleKeyIndex i = 0; + string s; + for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { + auto o = *ptr; + + if (!o) { + continue; + } + + auto value = o.get(); + + s += dict.at(i)->name + " = " + value + ", "; + } + return s; + } + + KeyDictionary &dict; +private: + vec values; +}; + +class SampleOutputStream { +public: + virtual void write(SampleRecord const &sample) = 0; +}; + +class VectorSampleOutputStream : public SampleOutputStream { + +public: + virtual void write(SampleRecord const &sample) override; + +public: + vector<SampleRecord> samples; +}; + +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr<SampleOutputStream> underlying; + std::mutex mutex; +}; + +class CsvSampleOutputStream : public SampleOutputStream { +public: + CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample); + + const KeyDictionary &getDict() { + return dict; + } + +private: + void writeHeader(); + + KeyDictionary &dict; + shared_ptr<ostream> stream; + bool headerWritten; +}; + +class JsonSampleOutputStream : public SampleOutputStream { +public: + JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class KeyValueSampleOutputStream : public SampleOutputStream { +public: + KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; +}; + +class RrdSampleOutputStream : public SampleOutputStream { +public: + RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields); + + void write(SampleRecord const &sample) override; + +private: + vector<SampleKey *> keys; + shared_ptr<ostream> stream; + const SampleKey *timestamp_key; +}; + +class SqlSampleOutputStream : public SampleOutputStream { +public: + SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr<ostream> stream; + const string table_name; +}; + +class SampleStreamParser { +public: + // TODO: return number of samples found for progress indication? + virtual void process(mutable_buffers_1 buffer) = 0; + + virtual sample_format_type type() { + return type_; + } + +protected: + sample_format_type type_; + + SampleStreamParser(const sample_format_type type) : type_(type) { + } +}; + +class KeyValueSampleStreamParser : public SampleStreamParser { + +public: + KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), + line(make_shared<vector<uint8_t>>()) { + } + + void process(mutable_buffers_1 buffer) override; + +private: + void process_line(shared_ptr<vector<uint8_t>> packet); + + static const uint8_t packet_delimiter = '\n'; + KeyDictionary &dict; + shared_ptr<SampleOutputStream> output; + shared_ptr<vector<uint8_t>> line; +}; + +class AutoSampleParser : public SampleStreamParser { +public: + AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); + +private: + unique_ptr<SampleStreamParser> parser; + unique_ptr<KeyValueSampleStreamParser> keyValueParser; +public: + virtual void process(mutable_buffers_1 buffer); +}; + +} +} |