#ifndef SOIL_MOISTURE_IO_H #define SOIL_MOISTURE_IO_H #include #include #include #include #include #include #include #include #include // TODO: rename to trygvis::sample namespace trygvis { namespace soil_moisture { using namespace std; using namespace boost::asio; template using o = boost::optional; enum class sample_format_type { AUTO, CSV, KEY_VALUE, JSON, SQL, RRD, }; string to_string(const sample_format_type &arg); std::ostream& operator<<(std::ostream& os, sample_format_type const& type); std::istream& operator>>(std::istream& is, sample_format_type& type); class SampleStreamParser; class SampleOutputStream; class KeyDictionary; class SampleKey; unique_ptr open_sample_input_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); class sample_output_stream_option { public: virtual ~sample_output_stream_option() { }; }; class output_fields : public sample_output_stream_option { public: // output_fields() { // } // // output_fields(std::vector::iterator begin, std::vector::iterator end) : // fields(begin, end) { // } ~output_fields() { } vector fields; }; class timestamp_field : public sample_output_stream_option { public: timestamp_field(string name) : name(name) { } ~timestamp_field() { } string name; }; unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type, vector options); static inline unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { return open_sample_output_stream(output, dict, type); } class sample_exception : public runtime_error { public: sample_exception(const string &what) : runtime_error(what) { } }; class KeyDictionary; using SampleKeyVector = vector; using SampleKeyIndex = SampleKeyVector::size_type; struct SampleKey { private: SampleKey(const SampleKey& that) = delete; SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { if (name.length() == 0) { throw sample_exception("Bad sample key."); } } public: friend class KeyDictionary; inline bool operator==(const SampleKey &that) const { return name == that.name; } const SampleKeyIndex index; const string name; }; class KeyDictionary { public: KeyDictionary() { } ~KeyDictionary() { std::for_each(keys.begin(), keys.end(), std::default_delete()); } KeyDictionary(KeyDictionary& that) = delete; SampleKey *indexOf(const string key) { SampleKeyIndex i = 0; for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { if ((*ptr)->name == key) { return *ptr; } } i = keys.size(); auto sample_key = new SampleKey(i, key); keys.push_back(sample_key); return sample_key; } SampleKey *at(SampleKeyIndex i) const { if (i >= keys.size()) { throw sample_exception("Out of bounds"); } return keys.at(i); } vector findIndexes(SampleKeyVector &keys) { vector indexes; for (auto &key: keys) { auto index = indexOf(key->name); indexes.push_back(index); } return indexes; } inline SampleKeyVector::const_iterator end() const { return keys.cend(); } inline SampleKeyVector::const_iterator begin() const { return keys.cbegin(); } // string nameOf(SampleKeyIndex index) { // return keys.at(index).name; // } inline SampleKeyVector::size_type size() const { return keys.size(); } inline bool empty() const { return keys.empty(); } private: SampleKeyVector keys; }; class SampleRecord { public: typedef vector> vec; SampleRecord(KeyDictionary &dict) : dict(dict) { } SampleRecord(KeyDictionary &dict, vec values) : dict(dict), values(values) { } inline vec::const_iterator cbegin() const { return values.cbegin(); } inline vec::const_iterator cend() const { return values.cend(); } inline bool empty() const { return values.empty(); } const o at(const SampleKey *key) const { SampleKeyIndex index = key->index; if (index >= values.size()) { return o(); } return values.at(index); } void set(const SampleKey *key, const std::string &value) { values.resize(max(values.size(), key->index + 1)); values.at(key->index).reset(value); } template const o lexical_at(const SampleKey *key) const { auto value = at(key); if (!value) { return o(); } return o(boost::lexical_cast(value.get())); } string to_string() const { SampleKeyIndex i = 0; string s; for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { auto o = *ptr; if (!o) { continue; } auto value = o.get(); s += dict.at(i)->name + " = " + value + ", "; } return s; } KeyDictionary &dict; private: vec values; }; class SampleOutputStream { public: virtual void write(SampleRecord const &sample) = 0; }; class VectorSampleOutputStream : public SampleOutputStream { public: virtual void write(SampleRecord const &sample) override; public: vector samples; }; class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample); const KeyDictionary &getDict() { return dict; } private: void writeHeader(); KeyDictionary &dict; shared_ptr stream; bool headerWritten; }; class JsonSampleOutputStream : public SampleOutputStream { public: JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class KeyValueSampleOutputStream : public SampleOutputStream { public: KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; }; class RrdSampleOutputStream : public SampleOutputStream { public: RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); void write(SampleRecord const &sample) override; private: vector keys; shared_ptr stream; const SampleKey *timestamp_key; }; class SqlSampleOutputStream : public SampleOutputStream { public: SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name); void write(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr stream; const string table_name; }; class SampleStreamParser { public: // TODO: return number of samples found for progress indication? virtual void process(mutable_buffers_1 buffer) = 0; virtual sample_format_type type() { return type_; } protected: sample_format_type type_; SampleStreamParser(const sample_format_type type) : type_(type) { } }; class KeyValueSampleParser : public SampleStreamParser { public: KeyValueSampleParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), line(make_shared>()) { } void process(mutable_buffers_1 buffer) override; private: void process_line(shared_ptr> packet); static const uint8_t packet_delimiter = '\n'; KeyDictionary &dict; shared_ptr output; shared_ptr> line; }; class AutoSampleParser : public SampleStreamParser { public: AutoSampleParser(shared_ptr output, KeyDictionary &dict); private: unique_ptr parser; unique_ptr keyValueParser; public: virtual void process(mutable_buffers_1 buffer); }; } } #endif