From 6bac290b92b635be047237b880144dbc163df6ec Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 22 Mar 2015 18:12:48 +0100 Subject: o Splitting out io parts into trygvis::sensor::io. --- sensor/CMakeLists.txt | 7 +- sensor/include/trygvis/SensorSample.h | 423 ------------------------------- sensor/include/trygvis/sensor.h | 207 ++++++++++++++++ sensor/include/trygvis/sensor/io.h | 210 ++++++++++++++++ sensor/main/SensorSample.cpp | 454 ---------------------------------- sensor/main/io.cpp | 418 +++++++++++++++++++++++++++++++ sensor/main/sensor.cpp | 79 ++++++ 7 files changed, 918 insertions(+), 880 deletions(-) delete mode 100644 sensor/include/trygvis/SensorSample.h create mode 100644 sensor/include/trygvis/sensor.h create mode 100644 sensor/include/trygvis/sensor/io.h delete mode 100644 sensor/main/SensorSample.cpp create mode 100644 sensor/main/io.cpp create mode 100644 sensor/main/sensor.cpp (limited to 'sensor') diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt index 7cee42b..14804f1 100644 --- a/sensor/CMakeLists.txt +++ b/sensor/CMakeLists.txt @@ -1,11 +1,12 @@ -file(GLOB INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h) +file(GLOB_RECURSE INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h) add_library(trygvis-sensor - main/SensorSample.cpp + main/sensor.cpp + main/io.cpp ${INCLUDES}) include_directories("${PROJECT_SOURCE_DIR}/json/src") include_directories(include) # Boost -find_package(Boost COMPONENTS regex system program_options REQUIRED) +find_package(Boost COMPONENTS regex system REQUIRED) diff --git a/sensor/include/trygvis/SensorSample.h b/sensor/include/trygvis/SensorSample.h deleted file mode 100644 index 438e2ae..0000000 --- a/sensor/include/trygvis/SensorSample.h +++ /dev/null @@ -1,423 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace trygvis { -namespace sensor { - -using namespace std; -using namespace boost::asio; - -template -using o = boost::optional; - -enum class sample_format_type { - AUTO, - CSV, - KEY_VALUE, - JSON, - SQL, - RRD, -}; - -string to_string(const sample_format_type &arg); - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type); - -std::istream& operator>>(std::istream& is, sample_format_type& type); - -class SampleStreamParser; - -class SampleOutputStream; - -class KeyDictionary; - -class SampleKey; - -// TODO: rename to open_sample_stream_parser -unique_ptr open_sample_input_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type = sample_format_type::AUTO); - -class sample_output_stream_option { -public: - virtual ~sample_output_stream_option() { - }; -}; - -class output_fields : public sample_output_stream_option { -public: -// output_fields() { -// } -// -// output_fields(std::vector::iterator begin, std::vector::iterator end) : -// fields(begin, end) { -// } - - ~output_fields() { - } - - vector fields; -}; - - -class timestamp_field : public sample_output_stream_option { -public: - timestamp_field(string name) : name(name) { - } - - ~timestamp_field() { - } - - string name; -}; - -unique_ptr open_sample_output_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type, - vector options); - -static inline -unique_ptr open_sample_output_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type) { - return open_sample_output_stream(output, dict, type); -} - -class ThreadSafeSampleOutputStream; - -static inline -unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { - return make_unique(move(underlying)); -}; - -class sample_exception : public runtime_error { -public: - sample_exception(const string &what) : runtime_error(what) { - } -}; - -class KeyDictionary; - -using SampleKeyVector = vector; -using SampleKeyIndex = SampleKeyVector::size_type; - -struct SampleKey { -private: - SampleKey(const SampleKey& that) = delete; - SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { - if (name.length() == 0) { - throw sample_exception("Bad sample key."); - } - } - -public: - friend class KeyDictionary; - - inline - bool operator==(const SampleKey &that) const { - return name == that.name; - } - - const SampleKeyIndex index; - const string name; -}; - -class KeyDictionary { -public: - KeyDictionary() { - } - - ~KeyDictionary() { - std::for_each(keys.begin(), keys.end(), std::default_delete()); - } - KeyDictionary(KeyDictionary& that) = delete; - - SampleKey *indexOf(const string key) { - SampleKeyIndex i = 0; - for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { - if ((*ptr)->name == key) { - return *ptr; - } - } - - i = keys.size(); - auto sample_key = new SampleKey(i, key); - keys.push_back(sample_key); - - return sample_key; - } - - SampleKey *at(SampleKeyIndex i) const { - if (i >= keys.size()) { - throw sample_exception("Out of bounds"); - } - - return keys.at(i); - } - - vector findIndexes(SampleKeyVector &keys) { - vector indexes; - - for (auto &key: keys) { - auto index = indexOf(key->name); - indexes.push_back(index); - } - - return indexes; - } - - inline - SampleKeyVector::const_iterator end() const { - return keys.cend(); - } - - inline - SampleKeyVector::const_iterator begin() const { - return keys.cbegin(); - } - -// string nameOf(SampleKeyIndex index) { -// return keys.at(index).name; -// } - - inline - SampleKeyVector::size_type size() const { - return keys.size(); - } - - inline - bool empty() const { - return keys.empty(); - } - -private: - SampleKeyVector keys; -}; - -class SampleRecord { -public: - typedef vector> vec; - - SampleRecord(KeyDictionary &dict) : dict(dict) { - } - - SampleRecord(KeyDictionary &dict, vec values) - : dict(dict), values(values) { - } - - inline - vec::const_iterator cbegin() const { - return values.cbegin(); - } - - inline - vec::const_iterator cend() const { - return values.cend(); - } - - inline - bool empty() const { - return values.empty(); - } - - const o at(const SampleKey *key) const { - SampleKeyIndex index = key->index; - if (index >= values.size()) { - return o(); - } - - return values.at(index); - } - - void set(const SampleKey *key, const std::string &value) { - values.resize(max(values.size(), key->index + 1)); - - values.at(key->index).reset(value); - } - - template - const o lexical_at(const SampleKey *key) const { - auto value = at(key); - - if (!value) { - return o(); - } - - return o(boost::lexical_cast(value.get())); - } - - string to_string() const { - SampleKeyIndex i = 0; - string s; - for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { - auto o = *ptr; - - if (!o) { - continue; - } - - auto value = o.get(); - - s += dict.at(i)->name + " = " + value + ", "; - } - return s; - } - - KeyDictionary &dict; -private: - vec values; -}; - -class SampleOutputStream { -public: - virtual void write(SampleRecord const &sample) = 0; -}; - -class VectorSampleOutputStream : public SampleOutputStream { - -public: - virtual void write(SampleRecord const &sample) override; - -public: - vector samples; -}; - -class ThreadSafeSampleOutputStream : public SampleOutputStream { -public: - ThreadSafeSampleOutputStream(unique_ptr underlying); - - ~ThreadSafeSampleOutputStream() { - } - - void write(SampleRecord const &sample) override; - -private: - unique_ptr underlying; - std::mutex mutex; -}; - -class CsvSampleOutputStream : public SampleOutputStream { -public: - CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); - - void write(SampleRecord const &sample); - - const KeyDictionary &getDict() { - return dict; - } - -private: - void writeHeader(); - - KeyDictionary &dict; - shared_ptr stream; - bool headerWritten; -}; - -class JsonSampleOutputStream : public SampleOutputStream { -public: - JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr stream; -}; - -class KeyValueSampleOutputStream : public SampleOutputStream { -public: - KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr stream; -}; - -class RrdSampleOutputStream : public SampleOutputStream { -public: - RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); - - void write(SampleRecord const &sample) override; - -private: - vector keys; - shared_ptr stream; - const SampleKey *timestamp_key; -}; - -class SqlSampleOutputStream : public SampleOutputStream { -public: - SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name); - - void write(SampleRecord const &sample) override; - -private: - KeyDictionary &dict; - shared_ptr stream; - const string table_name; -}; - -class SampleStreamParser { -public: - // TODO: return number of samples found for progress indication? - virtual void process(mutable_buffers_1 buffer) = 0; - - virtual sample_format_type type() { - return type_; - } - -protected: - sample_format_type type_; - - SampleStreamParser(const sample_format_type type) : type_(type) { - } -}; - -class KeyValueSampleStreamParser : public SampleStreamParser { - -public: - KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), - line(make_shared>()) { - } - - void process(mutable_buffers_1 buffer) override; - -private: - void process_line(shared_ptr> packet); - - static const uint8_t packet_delimiter = '\n'; - KeyDictionary &dict; - shared_ptr output; - shared_ptr> line; -}; - -class AutoSampleParser : public SampleStreamParser { -public: - AutoSampleParser(shared_ptr output, KeyDictionary &dict); - -private: - unique_ptr parser; - unique_ptr keyValueParser; -public: - virtual void process(mutable_buffers_1 buffer); -}; - -} -} diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h new file mode 100644 index 0000000..42362b2 --- /dev/null +++ b/sensor/include/trygvis/sensor.h @@ -0,0 +1,207 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace trygvis { +namespace sensor { + +using namespace std; + +template +using o = boost::optional; + +enum class sample_format_type { + AUTO, + CSV, + KEY_VALUE, + JSON, + SQL, + RRD, +}; + +string to_string(const sample_format_type &arg); + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type); + +std::istream& operator>>(std::istream& is, sample_format_type& type); + +class sample_exception : public runtime_error { +public: + sample_exception(const string &what) : runtime_error(what) { + } +}; + +class KeyDictionary; + +class SampleKey; + +class KeyDictionary; + +using SampleKeyVector = vector; +using SampleKeyIndex = SampleKeyVector::size_type; + +struct SampleKey { +private: + SampleKey(const SampleKey& that) = delete; + SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { + if (name.length() == 0) { + throw sample_exception("Bad sample key."); + } + } + +public: + friend class KeyDictionary; + + inline + bool operator==(const SampleKey &that) const { + return name == that.name; + } + + const SampleKeyIndex index; + const string name; +}; + +class KeyDictionary { +public: + KeyDictionary() { + } + + ~KeyDictionary() { + std::for_each(keys.begin(), keys.end(), std::default_delete()); + } + KeyDictionary(KeyDictionary& that) = delete; + + SampleKey *indexOf(const string key) { + SampleKeyIndex i = 0; + for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { + if ((*ptr)->name == key) { + return *ptr; + } + } + + i = keys.size(); + auto sample_key = new SampleKey(i, key); + keys.push_back(sample_key); + + return sample_key; + } + + SampleKey *at(SampleKeyIndex i) const { + if (i >= keys.size()) { + throw sample_exception("Out of bounds"); + } + + return keys.at(i); + } + + vector findIndexes(SampleKeyVector &keys) { + vector indexes; + + for (auto &key: keys) { + auto index = indexOf(key->name); + indexes.push_back(index); + } + + return indexes; + } + + inline + SampleKeyVector::const_iterator end() const { + return keys.cend(); + } + + inline + SampleKeyVector::const_iterator begin() const { + return keys.cbegin(); + } + +// string nameOf(SampleKeyIndex index) { +// return keys.at(index).name; +// } + + inline + SampleKeyVector::size_type size() const { + return keys.size(); + } + + inline + bool empty() const { + return keys.empty(); + } + +private: + SampleKeyVector keys; +}; + +class SampleRecord { +public: + typedef vector> vec; + + SampleRecord(KeyDictionary &dict) : dict(dict) { + } + + SampleRecord(KeyDictionary &dict, vec values) + : dict(dict), values(values) { + } + + inline + vec::const_iterator cbegin() const { + return values.cbegin(); + } + + inline + vec::const_iterator cend() const { + return values.cend(); + } + + inline + bool empty() const { + return values.empty(); + } + + const o at(const SampleKey *key) const { + SampleKeyIndex index = key->index; + if (index >= values.size()) { + return o(); + } + + return values.at(index); + } + + void set(const SampleKey *key, const std::string &value) { + values.resize(max(values.size(), key->index + 1)); + + values.at(key->index).reset(value); + } + + template + const o lexical_at(const SampleKey *key) const; + + string to_string() const { + SampleKeyIndex i = 0; + string s; + for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { + auto o = *ptr; + + if (!o) { + continue; + } + + auto value = o.get(); + + s += dict.at(i)->name + " = " + value + ", "; + } + return s; + } + + KeyDictionary &dict; +private: + vec values; +}; + +} +} diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h new file mode 100644 index 0000000..7db7615 --- /dev/null +++ b/sensor/include/trygvis/sensor/io.h @@ -0,0 +1,210 @@ +#pragma once + +#include "trygvis/sensor.h" + +#include +#include "boost/asio/buffer.hpp" + +namespace trygvis { +namespace sensor { +namespace io { + +using namespace std; +using namespace boost::asio; + +class output_fields; + +class timestamp_field; + +class SampleOutputStream { +public: + virtual void write(SampleRecord const &sample) = 0; +}; + +class VectorSampleOutputStream : public SampleOutputStream { + +public: + virtual void write(SampleRecord const &sample) override; + +public: + vector samples; +}; + +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr underlying; + std::mutex mutex; +}; + +class CsvSampleOutputStream : public SampleOutputStream { +public: + CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + + void write(SampleRecord const &sample); + + const KeyDictionary &getDict() { + return dict; + } + +private: + void writeHeader(); + + KeyDictionary &dict; + shared_ptr stream; + bool headerWritten; +}; + +class JsonSampleOutputStream : public SampleOutputStream { +public: + JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr stream; +}; + +class KeyValueSampleOutputStream : public SampleOutputStream { +public: + KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr stream; +}; + +class RrdSampleOutputStream : public SampleOutputStream { +public: + RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields); + + void write(SampleRecord const &sample) override; + +private: + vector keys; + shared_ptr stream; + const SampleKey *timestamp_key; +}; + +class SqlSampleOutputStream : public SampleOutputStream { +public: + SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name); + + void write(SampleRecord const &sample) override; + +private: + KeyDictionary &dict; + shared_ptr stream; + const string table_name; +}; + +class SampleStreamParser { +public: + // TODO: return number of samples found for progress indication? + virtual void process(mutable_buffers_1 buffer) = 0; + + virtual sample_format_type type() { + return type_; + } + +protected: + sample_format_type type_; + + SampleStreamParser(const sample_format_type type) : type_(type) { + } +}; + +class KeyValueSampleStreamParser : public SampleStreamParser { + +public: + KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), + line(make_shared>()) { + } + + void process(mutable_buffers_1 buffer) override; + +private: + void process_line(shared_ptr> packet); + + static const uint8_t packet_delimiter = '\n'; + KeyDictionary &dict; + shared_ptr output; + shared_ptr> line; +}; + +class AutoSampleParser : public SampleStreamParser { +public: + AutoSampleParser(shared_ptr output, KeyDictionary &dict); + +private: + unique_ptr parser; + unique_ptr keyValueParser; +public: + virtual void process(mutable_buffers_1 buffer); +}; + +class sample_output_stream_option { +public: + virtual ~sample_output_stream_option() { + }; +}; + +class output_fields : public sample_output_stream_option { +public: + ~output_fields() { + } + + vector fields; +}; + +class timestamp_field : public sample_output_stream_option { +public: + timestamp_field(string name) : name(name) { + } + + ~timestamp_field() { + } + + string name; +}; + +// TODO: rename to open_sample_stream_parser +unique_ptr open_sample_input_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +unique_ptr open_sample_output_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type, +vector options); + +static inline +unique_ptr open_sample_output_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type) { +return open_sample_output_stream(output, dict, type); +} + +static inline +unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { + return make_unique(move(underlying)); +}; + + +} +} +} diff --git a/sensor/main/SensorSample.cpp b/sensor/main/SensorSample.cpp deleted file mode 100644 index 5f0e9c6..0000000 --- a/sensor/main/SensorSample.cpp +++ /dev/null @@ -1,454 +0,0 @@ -#include "trygvis/SensorSample.h" - -#include "json.hpp" -#include -#include -#include - -namespace trygvis { -namespace sensor { - -using namespace std; -using json = nlohmann::json; - -void VectorSampleOutputStream::write(SampleRecord const &sample) { - if (sample.empty()) { - return; - } - - samples.emplace_back(sample); -} - -CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict) - : stream(move(stream)), headerWritten(false), dict(dict) { -} - -void CsvSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - // Build the dict with the keys from the first sample. - if (dict.empty()) { - SampleKeyIndex index = 0; - auto ptr = sample.cbegin(); - while (ptr != sample.cend()) { - auto o = *ptr; - - if (o) { - auto name = sample.dict.at(index)->name; - dict.indexOf(name); - } - - ptr++; - index++; - } - } - - if (!headerWritten) { - writeHeader(); - headerWritten = true; - } - - auto &s = *stream.get(); - - auto it = dict.begin(); - while (it != dict.end()) { - if (it != dict.begin()) { - s << ","; - } - - auto key = *it++; - auto sampleKey = sample.dict.indexOf(key->name); - auto o = sample.at(sampleKey); - - if (o) { - s << o.get(); - } - } - - s << endl; -} - -void CsvSampleOutputStream::writeHeader() { - auto &s = *stream.get(); - - auto i = dict.begin(); - while (i != dict.end()) { - s << (*i)->name; - - i++; - - if (i != dict.end()) { - s << ","; - } - } - - s << endl; -} - -JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : - dict(dict), stream(move(stream)) { -} - -void JsonSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - json doc({}); - - if (!dict.empty()) { - for (auto &key: dict) { - auto sampleKey = sample.dict.indexOf(key->name); - - auto value = sample.at(sampleKey); - - if (value) { - doc[key->name] = value.get(); - } - } - } else { - for (auto &sampleKey: sample.dict) { - auto o = sample.at(sampleKey); - - if (o) { - // Make sure that the key is registered in the dictionary - dict.indexOf(sampleKey->name); - doc[sampleKey->name] = o.get(); - } - } - } - - *stream.get() << doc << endl; -} - -KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : - dict(dict), stream(move(stream)) { -} - -void KeyValueSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - auto &s = *stream.get(); - - bool first = true; - if (!dict.empty()) { - for (auto &key: dict) { - auto sampleKey = sample.dict.indexOf(key->name); - - auto value = sample.at(sampleKey); - - if (value) { - if (first) { - first = false; - } else { - s << ", "; - } - s << key->name << "=" << value.get(); - } - } - } else { - for (auto &sampleKey: sample.dict) { - auto o = sample.at(sampleKey); - - if (o) { - if (first) { - first = false; - } else { - s << ", "; - } - // Make sure that the key is registered in the dictionary - dict.indexOf(sampleKey->name); - s << sampleKey->name << "=" << o.get(); - } - } - } - - *stream.get() << endl; -} - -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey* timestamp_key, o output_fields) : - stream(move(stream)), timestamp_key(timestamp_key) { - if (output_fields) { - for (auto field : output_fields.get()->fields) { - keys.emplace_back(dict.indexOf(field)); - } - } else { - for (auto key : dict) { - keys.emplace_back(key); - } - } -} - -void RrdSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records - if (sample.empty()) { - return; - } - - auto &s = *stream.get(); - - auto timestampO = sample.at(timestamp_key); - - if (!timestampO) { - return; - } - - auto timestamp = timestampO.get(); - - s << timestamp; - - bool first = true; - for (auto &key: keys) { - if (key == timestamp_key) { - continue; - } - - auto value = sample.at(key); - - if (first) { - s << "@"; - first = false; - } else { - s << ":"; - } - - s << (value ? value.get() : "U"); - } - - *stream.get() << endl; -} - -SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : - dict(dict), stream(move(stream)), table_name(table_name) { -} - -void SqlSampleOutputStream::write(SampleRecord const &values) { - throw sample_exception("deimplemented"); - -// string fs, vs; -// -// fs.reserve(1024); -// vs.reserve(1024); -// -// if (filter_fields) { -// auto i = fields.begin(); -// -// while (i != fields.end()) { -// auto field = *i; -// -// fs += field; -// -// auto value = values.find(field); -// -// if (value != values.end()) { -// vs += "'" + value->second + "'"; -// } else { -// vs += "NULL"; -// } -// -// i++; -// -// if (i != fields.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } else { -// auto i = values.begin(); -// while (i != values.end()) { -// auto v = *i++; -// -// fs += v.first; -// vs += "'" + v.second + "'"; -// -// if (i != values.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } -// -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; -} - -void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { - - size_t size = buffer_size(buffer); - - if (size == 0 && line->size()) { - process_line(line); - line = make_shared>(); - return; - } - - auto data = boost::asio::buffer_cast(buffer); - - for (int i = 0; i < size; i++) { - uint8_t b = data[i]; - - if (b == packet_delimiter) { - process_line(line); - line = make_shared>(); - } else { - line->push_back(b); - } - } - -} - -void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { - auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - auto s = std::string((char *) packet->data(), packet->size()); - - static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); - - auto start = s.cbegin(); - auto end = s.cend(); - boost::match_results what; - boost::match_flag_type flags = boost::match_default; - - SampleRecord sample(dict); - - while (regex_search(start, end, what, e, flags)) { - auto name = static_cast(what[1]); - auto value = static_cast(what[2]); - start = what[0].second; - - auto key = dict.indexOf(name); - sample.set(key, value); - - flags |= boost::match_prev_avail; - flags |= boost::match_not_bob; - } - - output->write(sample); -} - -AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { - // Directly select the parser now until we have more than one parser - parser = std::move(keyValueParser); - type_ = sample_format_type::KEY_VALUE; -} - -void AutoSampleParser::process(mutable_buffers_1 buffer) { - if (parser) { - parser->process(buffer); - } else { - throw runtime_error("Not implemented yet"); - } -} - -string to_string(const sample_format_type &arg) { - if (arg == sample_format_type::AUTO) - return "auto"; - else if (arg == sample_format_type::CSV) - return "csv"; - else if (arg == sample_format_type::JSON) - return "json"; - else if (arg == sample_format_type::KEY_VALUE) - return "key-value"; - else if (arg == sample_format_type::SQL) - return "sql"; - else if (arg == sample_format_type::RRD) - return "rrd"; - else - return "unknown"; -} - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { - return os << to_string(type); -} - -std::istream& operator>>(std::istream& is, sample_format_type& type) { - string s; - is >> s; - - if (s == "auto") { - type = sample_format_type::AUTO; - } else if (s == "csv") { - type = sample_format_type::CSV; - } else if (s == "key-value") { - type = sample_format_type::KEY_VALUE; - } else if (s == "json") { - type = sample_format_type::JSON; - } else if (s == "sql") { - type = sample_format_type::SQL; - } else if (s == "rrd") { - type = sample_format_type::RRD; - } - - return is; -} - -unique_ptr open_sample_input_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type) { - if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); - } else if (type == sample_format_type::AUTO) { - return make_unique(output, dict); - } else { - throw sample_exception("No parser for format type: " + to_string(type)); - } -} - -template -o find_option(vector &options) { - for (sample_output_stream_option *& option : options) { - T *x = dynamic_cast(option); - - if (x != nullptr) { - return o(x); - } - } - - return o(); -} - -unique_ptr open_sample_output_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type, - vector options) { - - if (type == sample_format_type::CSV) { - return make_unique(output, dict); - } else if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); - } else if (type == sample_format_type::JSON) { - return make_unique(output, dict); - } else if (type == sample_format_type::RRD) { - o of = find_option(options); - - o tsf = find_option(options); - - auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); - - return make_unique(output, dict, timestamp_key, of); -// } else if (type == sample_format_type::SQL) { -// return make_unique(dict, move(output), table_name); - } else { - throw sample_exception("No writer for format type: " + to_string(type)); - } -} - -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) : underlying(move(underlying)) { -} - -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { - std::unique_lock lock(mutex); - - underlying->write(sample); -} - -} -} diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp new file mode 100644 index 0000000..18040f1 --- /dev/null +++ b/sensor/main/io.cpp @@ -0,0 +1,418 @@ +#include "trygvis/sensor/io.h" + +#include +#include +#include +#include +#include "json.hpp" +#include "boost/regex.hpp" + +namespace trygvis { +namespace sensor { +namespace io { + +using namespace std; +using json = nlohmann::json; + +ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) + : underlying(move(underlying)) { +} + +void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { + std::unique_lock lock(mutex); + + underlying->write(sample); +} + +void VectorSampleOutputStream::write(SampleRecord const &sample) { + if (sample.empty()) { + return; + } + + samples.emplace_back(sample); +} + +CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict) + : stream(move(stream)), headerWritten(false), dict(dict) { +} + +void CsvSampleOutputStream::write(SampleRecord const &sample) { +// Skip empty records + if (sample.empty()) { + return; + } + +// Build the dict with the keys from the first sample. + if (dict.empty()) { + SampleKeyIndex index = 0; + auto ptr = sample.cbegin(); + while (ptr != sample.cend()) { + auto o = *ptr; + + if (o) { + auto name = sample.dict.at(index)->name; + dict.indexOf(name); + } + + ptr++; + index++; + } + } + + if (!headerWritten) { + writeHeader(); + headerWritten = true; + } + + auto &s = *stream.get(); + + auto it = dict.begin(); + while (it != dict.end()) { + if (it != dict.begin()) { + s << ","; + } + + auto key = *it++; + auto sampleKey = sample.dict.indexOf(key->name); + auto o = sample.at(sampleKey); + + if (o) { + s << o.get(); + } + } + + s << endl; +} + +void CsvSampleOutputStream::writeHeader() { + auto &s = *stream.get(); + + auto i = dict.begin(); + while (i != dict.end()) { + s << (*i)->name; + + i++; + + if (i != dict.end()) { + s << ","; + } + } + + s << endl; +} + +JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : + dict(dict), stream(move(stream)) { +} + +void JsonSampleOutputStream::write(SampleRecord const &sample) { +// Skip empty records + if (sample.empty()) { + return; + } + + json doc({}); + + if (!dict.empty()) { + for (auto &key: dict) { + auto sampleKey = sample.dict.indexOf(key->name); + + auto value = sample.at(sampleKey); + + if (value) { + doc[key->name] = value.get(); + } + } + } else { + for (auto &sampleKey: sample.dict) { + auto o = sample.at(sampleKey); + + if (o) { +// Make sure that the key is registered in the dictionary + dict.indexOf(sampleKey->name); + doc[sampleKey->name] = o.get(); + } + } + } + + *stream.get() << doc << endl; +} + +KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : + dict(dict), stream(move(stream)) { +} + +void KeyValueSampleOutputStream::write(SampleRecord const &sample) { +// Skip empty records + if (sample.empty()) { + return; + } + + auto &s = *stream.get(); + + bool first = true; + if (!dict.empty()) { + for (auto &key: dict) { + auto sampleKey = sample.dict.indexOf(key->name); + + auto value = sample.at(sampleKey); + + if (value) { + if (first) { + first = false; + } else { + s << ", "; + } + s << key->name << "=" << value.get(); + } + } + } else { + for (auto &sampleKey: sample.dict) { + auto o = sample.at(sampleKey); + + if (o) { + if (first) { + first = false; + } else { + s << ", "; + } +// Make sure that the key is registered in the dictionary + dict.indexOf(sampleKey->name); + s << sampleKey->name << "=" << o.get(); + } + } + } + + *stream.get() << endl; +} + +RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, KeyDictionary &dict, const SampleKey *timestamp_key, o output_fields) + : + stream(move(stream)), timestamp_key(timestamp_key) { + if (output_fields) { + for (auto field : output_fields.get()->fields) { + keys.emplace_back(dict.indexOf(field)); + } + } else { + for (auto key : dict) { + keys.emplace_back(key); + } + } +} + +void RrdSampleOutputStream::write(SampleRecord const &sample) { +// Skip empty records + if (sample.empty()) { + return; + } + + auto &s = *stream.get(); + + auto timestampO = sample.at(timestamp_key); + + if (!timestampO) { + return; + } + + auto timestamp = timestampO.get(); + + s << timestamp; + + bool first = true; + for (auto &key: keys) { + if (key == timestamp_key) { + continue; + } + + auto value = sample.at(key); + + if (first) { + s << "@"; + first = false; + } else { + s << ":"; + } + + s << (value ? value.get() : "U"); + } + + *stream.get() << endl; +} + +SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : + dict(dict), stream(move(stream)), table_name(table_name) { +} + +void SqlSampleOutputStream::write(SampleRecord const &values) { + throw sample_exception("deimplemented"); + +// string fs, vs; +// +// fs.reserve(1024); +// vs.reserve(1024); +// +// if (filter_fields) { +// auto i = fields.begin(); +// +// while (i != fields.end()) { +// auto field = *i; +// +// fs += field; +// +// auto value = values.find(field); +// +// if (value != values.end()) { +// vs += "'" + value->second + "'"; +// } else { +// vs += "NULL"; +// } +// +// i++; +// +// if (i != fields.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } else { +// auto i = values.begin(); +// while (i != values.end()) { +// auto v = *i++; +// +// fs += v.first; +// vs += "'" + v.second + "'"; +// +// if (i != values.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } +// +// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; +} + +void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { + + size_t size = buffer_size(buffer); + + if (size == 0 && line->size()) { + process_line(line); + line = make_shared>(); + return; + } + + auto data = boost::asio::buffer_cast(buffer); + + for (int i = 0; i < size; i++) { + uint8_t b = data[i]; + + if (b == packet_delimiter) { + process_line(line); + line = make_shared>(); + } else { + line->push_back(b); + } + } + +} + +void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { + auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + auto s = std::string((char *) packet->data(), packet->size()); + + static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); + + auto start = s.cbegin(); + auto end = s.cend(); + boost::match_results what; + boost::match_flag_type flags = boost::match_default; + + SampleRecord sample(dict); + + while (regex_search(start, end, what, e, flags)) { + auto name = static_cast(what[1]); + auto value = static_cast(what[2]); + start = what[0].second; + + auto key = dict.indexOf(name); + sample.set(key, value); + + flags |= boost::match_prev_avail; + flags |= boost::match_not_bob; + } + + output->write(sample); +} + +AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { + // Directly select the parser now until we have more than one parser + parser = std::move(keyValueParser); + type_ = sample_format_type::KEY_VALUE; +} + +void AutoSampleParser::process(mutable_buffers_1 buffer) { + if (parser) { + parser->process(buffer); + } else { + throw runtime_error("Not implemented yet"); + } +} + +unique_ptr open_sample_input_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type) { + if (type == sample_format_type::KEY_VALUE) { + return make_unique(output, dict); + } else if (type == sample_format_type::AUTO) { + return make_unique(output, dict); + } else { + throw sample_exception("No parser for format type: " + to_string(type)); + } +} + +template +o find_option(vector &options) { + for (sample_output_stream_option *&option : options) { + T *x = dynamic_cast(option); + + if (x != nullptr) { + return o(x); + } + } + + return o(); +} + +unique_ptr open_sample_output_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type, + vector options) { + + if (type == sample_format_type::CSV) { + return make_unique(output, dict); + } else if (type == sample_format_type::KEY_VALUE) { + return make_unique(output, dict); + } else if (type == sample_format_type::JSON) { + return make_unique(output, dict); + } else if (type == sample_format_type::RRD) { + o of = find_option(options); + + o tsf = find_option(options); + + auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); + + return make_unique(output, dict, timestamp_key, of); +// } else if (type == sample_format_type::SQL) { +// return make_unique(dict, move(output), table_name); + } else { + throw sample_exception("No writer for format type: " + to_string(type)); + } +} + +} +} +} \ No newline at end of file diff --git a/sensor/main/sensor.cpp b/sensor/main/sensor.cpp new file mode 100644 index 0000000..a773e0b --- /dev/null +++ b/sensor/main/sensor.cpp @@ -0,0 +1,79 @@ +#include "trygvis/sensor.h" + +#include "json.hpp" +#include +#include +#include +#include + +namespace trygvis { +namespace sensor { + +using namespace std; + +string to_string(const sample_format_type &arg) { + if (arg == sample_format_type::AUTO) + return "auto"; + else if (arg == sample_format_type::CSV) + return "csv"; + else if (arg == sample_format_type::JSON) + return "json"; + else if (arg == sample_format_type::KEY_VALUE) + return "key-value"; + else if (arg == sample_format_type::SQL) + return "sql"; + else if (arg == sample_format_type::RRD) + return "rrd"; + else + return "unknown"; +} + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { + return os << to_string(type); +} + +std::istream& operator>>(std::istream& is, sample_format_type& type) { + string s; + is >> s; + + if (s == "auto") { + type = sample_format_type::AUTO; + } else if (s == "csv") { + type = sample_format_type::CSV; + } else if (s == "key-value") { + type = sample_format_type::KEY_VALUE; + } else if (s == "json") { + type = sample_format_type::JSON; + } else if (s == "sql") { + type = sample_format_type::SQL; + } else if (s == "rrd") { + type = sample_format_type::RRD; + } + + return is; +} + +template<> +const o SampleRecord::lexical_at(const SampleKey *key) const { + auto value = at(key); + + if (!value) { + return o(); + } + + return o(boost::lexical_cast(value.get())); +} +// +//template +//const o SampleRecord::lexical_at(const SampleKey *key) const { +// auto value = at(key); +// +// if (!value) { +// return o(); +// } +// +// return o(boost::lexical_cast(value.get())); +//} + +} +} -- cgit v1.2.3