diff options
Diffstat (limited to 'sensor')
-rw-r--r-- | sensor/CMakeLists.txt | 7 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor.h | 207 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor/io.h (renamed from sensor/include/trygvis/SensorSample.h) | 331 | ||||
-rw-r--r-- | sensor/main/io.cpp (renamed from sensor/main/SensorSample.cpp) | 94 | ||||
-rw-r--r-- | sensor/main/sensor.cpp | 79 |
5 files changed, 378 insertions, 340 deletions
diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt index 7cee42b..14804f1 100644 --- a/sensor/CMakeLists.txt +++ b/sensor/CMakeLists.txt @@ -1,11 +1,12 @@ -file(GLOB INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h) +file(GLOB_RECURSE INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h) add_library(trygvis-sensor - main/SensorSample.cpp + main/sensor.cpp + main/io.cpp ${INCLUDES}) include_directories("${PROJECT_SOURCE_DIR}/json/src") include_directories(include) # Boost -find_package(Boost COMPONENTS regex system program_options REQUIRED) +find_package(Boost COMPONENTS regex system REQUIRED) diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h new file mode 100644 index 0000000..42362b2 --- /dev/null +++ b/sensor/include/trygvis/sensor.h @@ -0,0 +1,207 @@ +#pragma once + +#include <exception> +#include <string> +#include <iostream> +#include <memory> +#include <boost/optional/optional.hpp> + +namespace trygvis { +namespace sensor { + +using namespace std; + +template<typename A> +using o = boost::optional<A>; + +enum class sample_format_type { + AUTO, + CSV, + KEY_VALUE, + JSON, + SQL, + RRD, +}; + +string to_string(const sample_format_type &arg); + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type); + +std::istream& operator>>(std::istream& is, sample_format_type& type); + +class sample_exception : public runtime_error { +public: + sample_exception(const string &what) : runtime_error(what) { + } +}; + +class KeyDictionary; + +class SampleKey; + +class KeyDictionary; + +using SampleKeyVector = vector<SampleKey *>; +using SampleKeyIndex = SampleKeyVector::size_type; + +struct SampleKey { +private: + SampleKey(const SampleKey& that) = delete; + SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { + if (name.length() == 0) { + throw sample_exception("Bad sample key."); + } + } + +public: + friend class KeyDictionary; + + inline + bool operator==(const SampleKey &that) const { + return name == that.name; + } + + const SampleKeyIndex index; + const string name; +}; + +class KeyDictionary { +public: + KeyDictionary() { + } + + ~KeyDictionary() { + std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>()); + } + KeyDictionary(KeyDictionary& that) = delete; + + SampleKey *indexOf(const string key) { + SampleKeyIndex i = 0; + for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { + if ((*ptr)->name == key) { + return *ptr; + } + } + + i = keys.size(); + auto sample_key = new SampleKey(i, key); + keys.push_back(sample_key); + + return sample_key; + } + + SampleKey *at(SampleKeyIndex i) const { + if (i >= keys.size()) { + throw sample_exception("Out of bounds"); + } + + return keys.at(i); + } + + vector<SampleKey *> findIndexes(SampleKeyVector &keys) { + vector<SampleKey *> indexes; + + for (auto &key: keys) { + auto index = indexOf(key->name); + indexes.push_back(index); + } + + return indexes; + } + + inline + SampleKeyVector::const_iterator end() const { + return keys.cend(); + } + + inline + SampleKeyVector::const_iterator begin() const { + return keys.cbegin(); + } + +// string nameOf(SampleKeyIndex index) { +// return keys.at(index).name; +// } + + inline + SampleKeyVector::size_type size() const { + return keys.size(); + } + + inline + bool empty() const { + return keys.empty(); + } + +private: + SampleKeyVector keys; +}; + +class SampleRecord { +public: + typedef vector<o<string>> vec; + + SampleRecord(KeyDictionary &dict) : dict(dict) { + } + + SampleRecord(KeyDictionary &dict, vec values) + : dict(dict), values(values) { + } + + inline + vec::const_iterator cbegin() const { + return values.cbegin(); + } + + inline + vec::const_iterator cend() const { + return values.cend(); + } + + inline + bool empty() const { + return values.empty(); + } + + const o<string> at(const SampleKey *key) const { + SampleKeyIndex index = key->index; + if (index >= values.size()) { + return o<string>(); + } + + return values.at(index); + } + + void set(const SampleKey *key, const std::string &value) { + values.resize(max(values.size(), key->index + 1)); + + values.at(key->index).reset(value); + } + + template<class A> + const o<A> lexical_at(const SampleKey *key) const; + + string to_string() const { + SampleKeyIndex i = 0; + string s; + for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { + auto o = *ptr; + + if (!o) { + continue; + } + + auto value = o.get(); + + s += dict.at(i)->name + " = " + value + ", "; + } + return s; + } + + KeyDictionary &dict; +private: + vec values; +}; + +} +} diff --git a/sensor/include/trygvis/SensorSample.h b/sensor/include/trygvis/sensor/io.h index 438e2ae..7db7615 100644 --- a/sensor/include/trygvis/SensorSample.h +++ b/sensor/include/trygvis/sensor/io.h @@ -1,285 +1,20 @@ #pragma once -#include <ostream> -#include <vector> -#include <map> -#include <map> -#include <memory> -#include <boost/asio/buffer.hpp> -#include <boost/optional.hpp> -#include <boost/lexical_cast.hpp> -#include <functional> +#include "trygvis/sensor.h" + #include <mutex> +#include "boost/asio/buffer.hpp" namespace trygvis { namespace sensor { +namespace io { using namespace std; using namespace boost::asio; -template<typename A> -using o = boost::optional<A>; - -enum class sample_format_type { - AUTO, - CSV, - KEY_VALUE, - JSON, - SQL, - RRD, -}; - -string to_string(const sample_format_type &arg); - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type); - -std::istream& operator>>(std::istream& is, sample_format_type& type); - -class SampleStreamParser; - -class SampleOutputStream; - -class KeyDictionary; - -class SampleKey; - -// TODO: rename to open_sample_stream_parser -unique_ptr<SampleStreamParser> open_sample_input_stream( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type = sample_format_type::AUTO); - -class sample_output_stream_option { -public: - virtual ~sample_output_stream_option() { - }; -}; +class output_fields; -class output_fields : public sample_output_stream_option { -public: -// output_fields() { -// } -// -// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) : -// fields(begin, end) { -// } - - ~output_fields() { - } - - vector<string> fields; -}; - - -class timestamp_field : public sample_output_stream_option { -public: - timestamp_field(string name) : name(name) { - } - - ~timestamp_field() { - } - - string name; -}; - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - vector<sample_output_stream_option *> options); - -static inline -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type) { - return open_sample_output_stream(output, dict, type); -} - -class ThreadSafeSampleOutputStream; - -static inline -unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { - return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); -}; - -class sample_exception : public runtime_error { -public: - sample_exception(const string &what) : runtime_error(what) { - } -}; - -class KeyDictionary; - -using SampleKeyVector = vector<SampleKey *>; -using SampleKeyIndex = SampleKeyVector::size_type; - -struct SampleKey { -private: - SampleKey(const SampleKey& that) = delete; - SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) { - if (name.length() == 0) { - throw sample_exception("Bad sample key."); - } - } - -public: - friend class KeyDictionary; - - inline - bool operator==(const SampleKey &that) const { - return name == that.name; - } - - const SampleKeyIndex index; - const string name; -}; - -class KeyDictionary { -public: - KeyDictionary() { - } - - ~KeyDictionary() { - std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>()); - } - KeyDictionary(KeyDictionary& that) = delete; - - SampleKey *indexOf(const string key) { - SampleKeyIndex i = 0; - for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { - if ((*ptr)->name == key) { - return *ptr; - } - } - - i = keys.size(); - auto sample_key = new SampleKey(i, key); - keys.push_back(sample_key); - - return sample_key; - } - - SampleKey *at(SampleKeyIndex i) const { - if (i >= keys.size()) { - throw sample_exception("Out of bounds"); - } - - return keys.at(i); - } - - vector<SampleKey *> findIndexes(SampleKeyVector &keys) { - vector<SampleKey *> indexes; - - for (auto &key: keys) { - auto index = indexOf(key->name); - indexes.push_back(index); - } - - return indexes; - } - - inline - SampleKeyVector::const_iterator end() const { - return keys.cend(); - } - - inline - SampleKeyVector::const_iterator begin() const { - return keys.cbegin(); - } - -// string nameOf(SampleKeyIndex index) { -// return keys.at(index).name; -// } - - inline - SampleKeyVector::size_type size() const { - return keys.size(); - } - - inline - bool empty() const { - return keys.empty(); - } - -private: - SampleKeyVector keys; -}; - -class SampleRecord { -public: - typedef vector<o<string>> vec; - - SampleRecord(KeyDictionary &dict) : dict(dict) { - } - - SampleRecord(KeyDictionary &dict, vec values) - : dict(dict), values(values) { - } - - inline - vec::const_iterator cbegin() const { - return values.cbegin(); - } - - inline - vec::const_iterator cend() const { - return values.cend(); - } - - inline - bool empty() const { - return values.empty(); - } - - const o<string> at(const SampleKey *key) const { - SampleKeyIndex index = key->index; - if (index >= values.size()) { - return o<string>(); - } - - return values.at(index); - } - - void set(const SampleKey *key, const std::string &value) { - values.resize(max(values.size(), key->index + 1)); - - values.at(key->index).reset(value); - } - - template<class A> - const o<A> lexical_at(const SampleKey *key) const { - auto value = at(key); - - if (!value) { - return o<A>(); - } - - return o<A>(boost::lexical_cast<A>(value.get())); - } - - string to_string() const { - SampleKeyIndex i = 0; - string s; - for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { - auto o = *ptr; - - if (!o) { - continue; - } - - auto value = o.get(); - - s += dict.at(i)->name + " = " + value + ", "; - } - return s; - } - - KeyDictionary &dict; -private: - vec values; -}; +class timestamp_field; class SampleOutputStream { public: @@ -318,7 +53,7 @@ public: const KeyDictionary &getDict() { return dict; } - + private: void writeHeader(); @@ -419,5 +154,57 @@ public: virtual void process(mutable_buffers_1 buffer); }; +class sample_output_stream_option { +public: + virtual ~sample_output_stream_option() { + }; +}; + +class output_fields : public sample_output_stream_option { +public: + ~output_fields() { + } + + vector<string> fields; +}; + +class timestamp_field : public sample_output_stream_option { +public: + timestamp_field(string name) : name(name) { + } + + ~timestamp_field() { + } + + string name; +}; + +// TODO: rename to open_sample_stream_parser +unique_ptr<SampleStreamParser> open_sample_input_stream( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, +vector<sample_output_stream_option *> options); + +static inline +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type) { +return open_sample_output_stream(output, dict, type); +} + +static inline +unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { + return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +}; + + +} } } diff --git a/sensor/main/SensorSample.cpp b/sensor/main/io.cpp index 5f0e9c6..18040f1 100644 --- a/sensor/main/SensorSample.cpp +++ b/sensor/main/io.cpp @@ -1,16 +1,29 @@ -#include "trygvis/SensorSample.h" +#include "trygvis/sensor/io.h" +#include <ostream> +#include <vector> +#include <map> +#include <mutex> #include "json.hpp" -#include <set> -#include <boost/regex.hpp> -#include <chrono> +#include "boost/regex.hpp" namespace trygvis { namespace sensor { +namespace io { using namespace std; using json = nlohmann::json; +ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) + : underlying(move(underlying)) { +} + +void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { + std::unique_lock<std::mutex> lock(mutex); + + underlying->write(sample); +} + void VectorSampleOutputStream::write(SampleRecord const &sample) { if (sample.empty()) { return; @@ -24,12 +37,12 @@ CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDict } void CsvSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records +// Skip empty records if (sample.empty()) { return; } - // Build the dict with the keys from the first sample. +// Build the dict with the keys from the first sample. if (dict.empty()) { SampleKeyIndex index = 0; auto ptr = sample.cbegin(); @@ -93,7 +106,7 @@ JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDi } void JsonSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records +// Skip empty records if (sample.empty()) { return; } @@ -115,7 +128,7 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) { auto o = sample.at(sampleKey); if (o) { - // Make sure that the key is registered in the dictionary +// Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); doc[sampleKey->name] = o.get(); } @@ -130,7 +143,7 @@ KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> strea } void KeyValueSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records +// Skip empty records if (sample.empty()) { return; } @@ -163,7 +176,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { } else { s << ", "; } - // Make sure that the key is registered in the dictionary +// Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); s << sampleKey->name << "=" << o.get(); } @@ -173,7 +186,8 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { *stream.get() << endl; } -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) : +RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields) + : stream(move(stream)), timestamp_key(timestamp_key) { if (output_fields) { for (auto field : output_fields.get()->fields) { @@ -187,7 +201,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDict } void RrdSampleOutputStream::write(SampleRecord const &sample) { - // Skip empty records +// Skip empty records if (sample.empty()) { return; } @@ -311,7 +325,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet auto start = s.cbegin(); auto end = s.cend(); - boost::match_results<std::string::const_iterator> what; + boost::match_results <std::string::const_iterator> what; boost::match_flag_type flags = boost::match_default; SampleRecord sample(dict); @@ -346,48 +360,6 @@ void AutoSampleParser::process(mutable_buffers_1 buffer) { } } -string to_string(const sample_format_type &arg) { - if (arg == sample_format_type::AUTO) - return "auto"; - else if (arg == sample_format_type::CSV) - return "csv"; - else if (arg == sample_format_type::JSON) - return "json"; - else if (arg == sample_format_type::KEY_VALUE) - return "key-value"; - else if (arg == sample_format_type::SQL) - return "sql"; - else if (arg == sample_format_type::RRD) - return "rrd"; - else - return "unknown"; -} - -std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { - return os << to_string(type); -} - -std::istream& operator>>(std::istream& is, sample_format_type& type) { - string s; - is >> s; - - if (s == "auto") { - type = sample_format_type::AUTO; - } else if (s == "csv") { - type = sample_format_type::CSV; - } else if (s == "key-value") { - type = sample_format_type::KEY_VALUE; - } else if (s == "json") { - type = sample_format_type::JSON; - } else if (s == "sql") { - type = sample_format_type::SQL; - } else if (s == "rrd") { - type = sample_format_type::RRD; - } - - return is; -} - unique_ptr<SampleStreamParser> open_sample_input_stream( shared_ptr<SampleOutputStream> output, KeyDictionary &dict, @@ -403,7 +375,7 @@ unique_ptr<SampleStreamParser> open_sample_input_stream( template<typename T> o<T *> find_option(vector<sample_output_stream_option *> &options) { - for (sample_output_stream_option *& option : options) { + for (sample_output_stream_option *&option : options) { T *x = dynamic_cast<T *>(option); if (x != nullptr) { @@ -441,14 +413,6 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( } } -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) { -} - -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { - std::unique_lock<std::mutex> lock(mutex); - - underlying->write(sample); -} - } } +}
\ No newline at end of file diff --git a/sensor/main/sensor.cpp b/sensor/main/sensor.cpp new file mode 100644 index 0000000..a773e0b --- /dev/null +++ b/sensor/main/sensor.cpp @@ -0,0 +1,79 @@ +#include "trygvis/sensor.h" + +#include "json.hpp" +#include <set> +#include <boost/regex.hpp> +#include <boost/lexical_cast.hpp> +#include <chrono> + +namespace trygvis { +namespace sensor { + +using namespace std; + +string to_string(const sample_format_type &arg) { + if (arg == sample_format_type::AUTO) + return "auto"; + else if (arg == sample_format_type::CSV) + return "csv"; + else if (arg == sample_format_type::JSON) + return "json"; + else if (arg == sample_format_type::KEY_VALUE) + return "key-value"; + else if (arg == sample_format_type::SQL) + return "sql"; + else if (arg == sample_format_type::RRD) + return "rrd"; + else + return "unknown"; +} + +std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { + return os << to_string(type); +} + +std::istream& operator>>(std::istream& is, sample_format_type& type) { + string s; + is >> s; + + if (s == "auto") { + type = sample_format_type::AUTO; + } else if (s == "csv") { + type = sample_format_type::CSV; + } else if (s == "key-value") { + type = sample_format_type::KEY_VALUE; + } else if (s == "json") { + type = sample_format_type::JSON; + } else if (s == "sql") { + type = sample_format_type::SQL; + } else if (s == "rrd") { + type = sample_format_type::RRD; + } + + return is; +} + +template<> +const o<long> SampleRecord::lexical_at(const SampleKey *key) const { + auto value = at(key); + + if (!value) { + return o<long>(); + } + + return o<long>(boost::lexical_cast<long>(value.get())); +} +// +//template<class A> +//const o<A> SampleRecord::lexical_at(const SampleKey *key) const { +// auto value = at(key); +// +// if (!value) { +// return o<A>(); +// } +// +// return o<A>(boost::lexical_cast<A>(value.get())); +//} + +} +} |