diff options
Diffstat (limited to 'apps/SoilMoistureIo.h')
-rw-r--r-- | apps/SoilMoistureIo.h | 427 |
1 files changed, 0 insertions, 427 deletions
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h deleted file mode 100644 index 386296a..0000000 --- a/apps/SoilMoistureIo.h +++ /dev/null @@ -1,427 +0,0 @@ -#ifndef SOIL_MOISTURE_IO_H -#define SOIL_MOISTURE_IO_H - -#include <ostream> -#include <vector> -#include <map> -#include <map> -#include <memory> -#include <boost/asio/buffer.hpp> -#include <boost/optional.hpp> -#include <boost/lexical_cast.hpp> -#include <functional> -#include <mutex> - -// TODO: rename to trygvis::sample -namespace trygvis { -namespace soil_moisture { - -using namespace std; -using namespace boost::asio; - -template<typename A> -using o = boost::optional<A>; - -enum class sample_format_type { - AUTO, - CSV, - KEY_VALUE, - JSON, - SQL, - RRD, -}; - -string to_string(const sample_format_type &arg); - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type); - -std::istream& operator>>(std::istream& is, sample_format_type& type); - -class SampleStreamParser; - -class SampleOutputStream; - -class KeyDictionary; - -class SampleKey; - -// TODO: rename to open_sample_stream_parser -unique_ptr<SampleStreamParser> open_sample_input_stream( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type = sample_format_type::AUTO); - -class sample_output_stream_option { -public: - virtual ~sample_output_stream_option() { - }; -}; - -class output_fields : public sample_output_stream_option { -public: -// output_fields() { -// } -// -// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) : -// fields(begin, end) { -// } - - ~output_fields() { - } - - vector<string> fields; -}; - - -class timestamp_field : public sample_output_stream_option { -public: - timestamp_field(string name) : name(name) { - } - - ~timestamp_field() { - } - - string name; -}; - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - vector<sample_output_stream_option *> options); - -static inline -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type) { - return open_sample_output_stream(output, dict, type); -} - -class ThreadSafeSampleOutputStream; - -static inline -unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { - return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); -}; - -class sample_exception : public runtime_error { -public: - sample_exception(const string &what) : runtime_error(what) { - } -}; - -class KeyDictionary; - -using SampleKeyVector = vector<SampleKey *>; -using SampleKeyIndex = SampleKeyVector::size_type; - -struct SampleKey { -private: - SampleKey(const SampleKey& that) = delete; - SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { - if (name.length() == 0) { - throw sample_exception("Bad sample key."); - } - } - -public: - friend class KeyDictionary; - - inline - bool operator==(const SampleKey &that) const { - return name == that.name; - } - - const SampleKeyIndex index; - const string name; -}; - -class KeyDictionary { -public: - KeyDictionary() { - } - - ~KeyDictionary() { - std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>()); - } - KeyDictionary(KeyDictionary& that) = delete; - - SampleKey *indexOf(const string key) { - SampleKeyIndex i = 0; - for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { - if ((*ptr)->name == key) { - return *ptr; - } - } - - i = keys.size(); - auto sample_key = new SampleKey(i, key); - keys.push_back(sample_key); - - return sample_key; - } - - SampleKey *at(SampleKeyIndex i) const { - if (i >= keys.size()) { - throw sample_exception("Out of bounds"); - } - - return keys.at(i); - } - - vector<SampleKey *> findIndexes(SampleKeyVector &keys) { - vector<SampleKey *> indexes; - - for (auto &key: keys) { - auto index = indexOf(key->name); - indexes.push_back(index); - } - - return indexes; - } - - inline - SampleKeyVector::const_iterator end() const { - return keys.cend(); - } - - inline - SampleKeyVector::const_iterator begin() const { - return keys.cbegin(); - } - -// string nameOf(SampleKeyIndex index) { -// return keys.at(index).name; -// } - - inline - SampleKeyVector::size_type size() const { - return keys.size(); - } - - inline - bool empty() const { - return keys.empty(); - } - -private: - SampleKeyVector keys; -}; - -class SampleRecord { -public: - typedef vector<o<string>> vec; - - SampleRecord(KeyDictionary &dict) : dict(dict) { - } - - SampleRecord(KeyDictionary &dict, vec values) - : dict(dict), values(values) { - } - - inline - vec::const_iterator cbegin() const { - return values.cbegin(); - } - - inline - vec::const_iterator cend() const { - return values.cend(); - } - - inline - bool empty() const { - return values.empty(); - } - - const o<string> at(const SampleKey *key) const { - SampleKeyIndex index = key->index; - if (index >= values.size()) { - return o<string>(); - } - - return values.at(index); - } - - void set(const SampleKey *key, const std::string &value) { - values.resize(max(values.size(), key->index + 1)); - - values.at(key->index).reset(value); - } - - template<class A> - const o<A> lexical_at(const SampleKey *key) const { - auto value = at(key); - - if (!value) { - return o<A>(); - } - - return o<A>(boost::lexical_cast<A>(value.get())); - } - - string to_string() const { - SampleKeyIndex i = 0; - string s; - for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { - auto o = *ptr; - - if (!o) { - continue; - } - - auto value = o.get(); - - s += dict.at(i)->name + " = " + value + ", "; - } - return s; - } - - KeyDictionary &dict; -private: - vec values; -}; - -class SampleOutputStream { -public: - virtual void write(SampleRecord const &sample) = 0; -}; - -class VectorSampleOutputStream : public SampleOutputStream { - -public: - virtual void write(SampleRecord const &sample) override; - -public: - vector<SampleRecord> samples; -}; - -class ThreadSafeSampleOutputStream : public SampleOutputStream { -public: - ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); - - ~ThreadSafeSampleOutputStream() { - } - - void write(SampleRecord const &sample) override; - -private: - unique_ptr<SampleOutputStream> underlying; - std::mutex mutex; -}; - -class CsvSampleOutputStream : public SampleOutputStream { -public: - CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); - - void write(SampleRecord const &sample); - - const KeyDictionary &getDict() { - return dict; - } - -private: - void writeHeader(); - - KeyDictionary &dict; - shared_ptr<ostream> stream; - bool headerWritten; -}; - -class JsonSampleOutputStream : public SampleOutputStream { -public: - JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr<ostream> stream; -}; - -class KeyValueSampleOutputStream : public SampleOutputStream { -public: - KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr<ostream> stream; -}; - -class RrdSampleOutputStream : public SampleOutputStream { -public: - RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields); - - void write(SampleRecord const &sample) override; - -private: - vector<SampleKey *> keys; - shared_ptr<ostream> stream; - const SampleKey *timestamp_key; -}; - -class SqlSampleOutputStream : public SampleOutputStream { -public: - SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr<ostream> stream; - const string table_name; -}; - -class SampleStreamParser { -public: - // TODO: return number of samples found for progress indication? - virtual void process(mutable_buffers_1 buffer) = 0; - - virtual sample_format_type type() { - return type_; - } - -protected: - sample_format_type type_; - - SampleStreamParser(const sample_format_type type) : type_(type) { - } -}; - -class KeyValueSampleStreamParser : public SampleStreamParser { - -public: - KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), - line(make_shared<vector<uint8_t>>()) { - } - - void process(mutable_buffers_1 buffer) override; - -private: - void process_line(shared_ptr<vector<uint8_t>> packet); - - static const uint8_t packet_delimiter = '\n'; - KeyDictionary &dict; - shared_ptr<SampleOutputStream> output; - shared_ptr<vector<uint8_t>> line; -}; - -class AutoSampleParser : public SampleStreamParser { -public: - AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); - -private: - unique_ptr<SampleStreamParser> parser; - unique_ptr<KeyValueSampleStreamParser> keyValueParser; -public: - virtual void process(mutable_buffers_1 buffer); -}; - -} -} - -#endif |