aboutsummaryrefslogtreecommitdiff
path: root/sensor
diff options
context:
space:
mode:
Diffstat (limited to 'sensor')
-rw-r--r--sensor/CMakeLists.txt7
-rw-r--r--sensor/include/trygvis/sensor.h207
-rw-r--r--sensor/include/trygvis/sensor/io.h (renamed from sensor/include/trygvis/SensorSample.h)331
-rw-r--r--sensor/main/io.cpp (renamed from sensor/main/SensorSample.cpp)94
-rw-r--r--sensor/main/sensor.cpp79
5 files changed, 378 insertions, 340 deletions
diff --git a/sensor/CMakeLists.txt b/sensor/CMakeLists.txt
index 7cee42b..14804f1 100644
--- a/sensor/CMakeLists.txt
+++ b/sensor/CMakeLists.txt
@@ -1,11 +1,12 @@
-file(GLOB INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h)
+file(GLOB_RECURSE INCLUDES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} include/*.h)
add_library(trygvis-sensor
- main/SensorSample.cpp
+ main/sensor.cpp
+ main/io.cpp
${INCLUDES})
include_directories("${PROJECT_SOURCE_DIR}/json/src")
include_directories(include)
# Boost
-find_package(Boost COMPONENTS regex system program_options REQUIRED)
+find_package(Boost COMPONENTS regex system REQUIRED)
diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h
new file mode 100644
index 0000000..42362b2
--- /dev/null
+++ b/sensor/include/trygvis/sensor.h
@@ -0,0 +1,207 @@
+#pragma once
+
+#include <exception>
+#include <string>
+#include <iostream>
+#include <memory>
+#include <boost/optional/optional.hpp>
+
+namespace trygvis {
+namespace sensor {
+
+using namespace std;
+
+template<typename A>
+using o = boost::optional<A>;
+
+enum class sample_format_type {
+ AUTO,
+ CSV,
+ KEY_VALUE,
+ JSON,
+ SQL,
+ RRD,
+};
+
+string to_string(const sample_format_type &arg);
+
+std::ostream& operator<<(std::ostream& os, sample_format_type const& type);
+
+std::istream& operator>>(std::istream& is, sample_format_type& type);
+
+class sample_exception : public runtime_error {
+public:
+ sample_exception(const string &what) : runtime_error(what) {
+ }
+};
+
+class KeyDictionary;
+
+class SampleKey;
+
+class KeyDictionary;
+
+using SampleKeyVector = vector<SampleKey *>;
+using SampleKeyIndex = SampleKeyVector::size_type;
+
+struct SampleKey {
+private:
+ SampleKey(const SampleKey& that) = delete;
+ SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) {
+ if (name.length() == 0) {
+ throw sample_exception("Bad sample key.");
+ }
+ }
+
+public:
+ friend class KeyDictionary;
+
+ inline
+ bool operator==(const SampleKey &that) const {
+ return name == that.name;
+ }
+
+ const SampleKeyIndex index;
+ const string name;
+};
+
+class KeyDictionary {
+public:
+ KeyDictionary() {
+ }
+
+ ~KeyDictionary() {
+ std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>());
+ }
+ KeyDictionary(KeyDictionary& that) = delete;
+
+ SampleKey *indexOf(const string key) {
+ SampleKeyIndex i = 0;
+ for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) {
+ if ((*ptr)->name == key) {
+ return *ptr;
+ }
+ }
+
+ i = keys.size();
+ auto sample_key = new SampleKey(i, key);
+ keys.push_back(sample_key);
+
+ return sample_key;
+ }
+
+ SampleKey *at(SampleKeyIndex i) const {
+ if (i >= keys.size()) {
+ throw sample_exception("Out of bounds");
+ }
+
+ return keys.at(i);
+ }
+
+ vector<SampleKey *> findIndexes(SampleKeyVector &keys) {
+ vector<SampleKey *> indexes;
+
+ for (auto &key: keys) {
+ auto index = indexOf(key->name);
+ indexes.push_back(index);
+ }
+
+ return indexes;
+ }
+
+ inline
+ SampleKeyVector::const_iterator end() const {
+ return keys.cend();
+ }
+
+ inline
+ SampleKeyVector::const_iterator begin() const {
+ return keys.cbegin();
+ }
+
+// string nameOf(SampleKeyIndex index) {
+// return keys.at(index).name;
+// }
+
+ inline
+ SampleKeyVector::size_type size() const {
+ return keys.size();
+ }
+
+ inline
+ bool empty() const {
+ return keys.empty();
+ }
+
+private:
+ SampleKeyVector keys;
+};
+
+class SampleRecord {
+public:
+ typedef vector<o<string>> vec;
+
+ SampleRecord(KeyDictionary &dict) : dict(dict) {
+ }
+
+ SampleRecord(KeyDictionary &dict, vec values)
+ : dict(dict), values(values) {
+ }
+
+ inline
+ vec::const_iterator cbegin() const {
+ return values.cbegin();
+ }
+
+ inline
+ vec::const_iterator cend() const {
+ return values.cend();
+ }
+
+ inline
+ bool empty() const {
+ return values.empty();
+ }
+
+ const o<string> at(const SampleKey *key) const {
+ SampleKeyIndex index = key->index;
+ if (index >= values.size()) {
+ return o<string>();
+ }
+
+ return values.at(index);
+ }
+
+ void set(const SampleKey *key, const std::string &value) {
+ values.resize(max(values.size(), key->index + 1));
+
+ values.at(key->index).reset(value);
+ }
+
+ template<class A>
+ const o<A> lexical_at(const SampleKey *key) const;
+
+ string to_string() const {
+ SampleKeyIndex i = 0;
+ string s;
+ for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
+ auto o = *ptr;
+
+ if (!o) {
+ continue;
+ }
+
+ auto value = o.get();
+
+ s += dict.at(i)->name + " = " + value + ", ";
+ }
+ return s;
+ }
+
+ KeyDictionary &dict;
+private:
+ vec values;
+};
+
+}
+}
diff --git a/sensor/include/trygvis/SensorSample.h b/sensor/include/trygvis/sensor/io.h
index 438e2ae..7db7615 100644
--- a/sensor/include/trygvis/SensorSample.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -1,285 +1,20 @@
#pragma once
-#include <ostream>
-#include <vector>
-#include <map>
-#include <map>
-#include <memory>
-#include <boost/asio/buffer.hpp>
-#include <boost/optional.hpp>
-#include <boost/lexical_cast.hpp>
-#include <functional>
+#include "trygvis/sensor.h"
+
#include <mutex>
+#include "boost/asio/buffer.hpp"
namespace trygvis {
namespace sensor {
+namespace io {
using namespace std;
using namespace boost::asio;
-template<typename A>
-using o = boost::optional<A>;
-
-enum class sample_format_type {
- AUTO,
- CSV,
- KEY_VALUE,
- JSON,
- SQL,
- RRD,
-};
-
-string to_string(const sample_format_type &arg);
-
-std::ostream& operator<<(std::ostream& os, sample_format_type const& type);
-
-std::istream& operator>>(std::istream& is, sample_format_type& type);
-
-class SampleStreamParser;
-
-class SampleOutputStream;
-
-class KeyDictionary;
-
-class SampleKey;
-
-// TODO: rename to open_sample_stream_parser
-unique_ptr<SampleStreamParser> open_sample_input_stream(
- shared_ptr<SampleOutputStream> output,
- KeyDictionary &dict,
- sample_format_type type = sample_format_type::AUTO);
-
-class sample_output_stream_option {
-public:
- virtual ~sample_output_stream_option() {
- };
-};
+class output_fields;
-class output_fields : public sample_output_stream_option {
-public:
-// output_fields() {
-// }
-//
-// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) :
-// fields(begin, end) {
-// }
-
- ~output_fields() {
- }
-
- vector<string> fields;
-};
-
-
-class timestamp_field : public sample_output_stream_option {
-public:
- timestamp_field(string name) : name(name) {
- }
-
- ~timestamp_field() {
- }
-
- string name;
-};
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type,
- vector<sample_output_stream_option *> options);
-
-static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type) {
- return open_sample_output_stream(output, dict, type);
-}
-
-class ThreadSafeSampleOutputStream;
-
-static inline
-unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
- return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
-};
-
-class sample_exception : public runtime_error {
-public:
- sample_exception(const string &what) : runtime_error(what) {
- }
-};
-
-class KeyDictionary;
-
-using SampleKeyVector = vector<SampleKey *>;
-using SampleKeyIndex = SampleKeyVector::size_type;
-
-struct SampleKey {
-private:
- SampleKey(const SampleKey& that) = delete;
- SampleKey(SampleKeyIndex index, const string &name) : index(index), name(name) {
- if (name.length() == 0) {
- throw sample_exception("Bad sample key.");
- }
- }
-
-public:
- friend class KeyDictionary;
-
- inline
- bool operator==(const SampleKey &that) const {
- return name == that.name;
- }
-
- const SampleKeyIndex index;
- const string name;
-};
-
-class KeyDictionary {
-public:
- KeyDictionary() {
- }
-
- ~KeyDictionary() {
- std::for_each(keys.begin(), keys.end(), std::default_delete<SampleKey>());
- }
- KeyDictionary(KeyDictionary& that) = delete;
-
- SampleKey *indexOf(const string key) {
- SampleKeyIndex i = 0;
- for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) {
- if ((*ptr)->name == key) {
- return *ptr;
- }
- }
-
- i = keys.size();
- auto sample_key = new SampleKey(i, key);
- keys.push_back(sample_key);
-
- return sample_key;
- }
-
- SampleKey *at(SampleKeyIndex i) const {
- if (i >= keys.size()) {
- throw sample_exception("Out of bounds");
- }
-
- return keys.at(i);
- }
-
- vector<SampleKey *> findIndexes(SampleKeyVector &keys) {
- vector<SampleKey *> indexes;
-
- for (auto &key: keys) {
- auto index = indexOf(key->name);
- indexes.push_back(index);
- }
-
- return indexes;
- }
-
- inline
- SampleKeyVector::const_iterator end() const {
- return keys.cend();
- }
-
- inline
- SampleKeyVector::const_iterator begin() const {
- return keys.cbegin();
- }
-
-// string nameOf(SampleKeyIndex index) {
-// return keys.at(index).name;
-// }
-
- inline
- SampleKeyVector::size_type size() const {
- return keys.size();
- }
-
- inline
- bool empty() const {
- return keys.empty();
- }
-
-private:
- SampleKeyVector keys;
-};
-
-class SampleRecord {
-public:
- typedef vector<o<string>> vec;
-
- SampleRecord(KeyDictionary &dict) : dict(dict) {
- }
-
- SampleRecord(KeyDictionary &dict, vec values)
- : dict(dict), values(values) {
- }
-
- inline
- vec::const_iterator cbegin() const {
- return values.cbegin();
- }
-
- inline
- vec::const_iterator cend() const {
- return values.cend();
- }
-
- inline
- bool empty() const {
- return values.empty();
- }
-
- const o<string> at(const SampleKey *key) const {
- SampleKeyIndex index = key->index;
- if (index >= values.size()) {
- return o<string>();
- }
-
- return values.at(index);
- }
-
- void set(const SampleKey *key, const std::string &value) {
- values.resize(max(values.size(), key->index + 1));
-
- values.at(key->index).reset(value);
- }
-
- template<class A>
- const o<A> lexical_at(const SampleKey *key) const {
- auto value = at(key);
-
- if (!value) {
- return o<A>();
- }
-
- return o<A>(boost::lexical_cast<A>(value.get()));
- }
-
- string to_string() const {
- SampleKeyIndex i = 0;
- string s;
- for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
- auto o = *ptr;
-
- if (!o) {
- continue;
- }
-
- auto value = o.get();
-
- s += dict.at(i)->name + " = " + value + ", ";
- }
- return s;
- }
-
- KeyDictionary &dict;
-private:
- vec values;
-};
+class timestamp_field;
class SampleOutputStream {
public:
@@ -318,7 +53,7 @@ public:
const KeyDictionary &getDict() {
return dict;
}
-
+
private:
void writeHeader();
@@ -419,5 +154,57 @@ public:
virtual void process(mutable_buffers_1 buffer);
};
+class sample_output_stream_option {
+public:
+ virtual ~sample_output_stream_option() {
+ };
+};
+
+class output_fields : public sample_output_stream_option {
+public:
+ ~output_fields() {
+ }
+
+ vector<string> fields;
+};
+
+class timestamp_field : public sample_output_stream_option {
+public:
+ timestamp_field(string name) : name(name) {
+ }
+
+ ~timestamp_field() {
+ }
+
+ string name;
+};
+
+// TODO: rename to open_sample_stream_parser
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type = sample_format_type::AUTO);
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+vector<sample_output_stream_option *> options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+return open_sample_output_stream(output, dict, type);
+}
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
+
+}
}
}
diff --git a/sensor/main/SensorSample.cpp b/sensor/main/io.cpp
index 5f0e9c6..18040f1 100644
--- a/sensor/main/SensorSample.cpp
+++ b/sensor/main/io.cpp
@@ -1,16 +1,29 @@
-#include "trygvis/SensorSample.h"
+#include "trygvis/sensor/io.h"
+#include <ostream>
+#include <vector>
+#include <map>
+#include <mutex>
#include "json.hpp"
-#include <set>
-#include <boost/regex.hpp>
-#include <chrono>
+#include "boost/regex.hpp"
namespace trygvis {
namespace sensor {
+namespace io {
using namespace std;
using json = nlohmann::json;
+ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
+ : underlying(move(underlying)) {
+}
+
+void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ underlying->write(sample);
+}
+
void VectorSampleOutputStream::write(SampleRecord const &sample) {
if (sample.empty()) {
return;
@@ -24,12 +37,12 @@ CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDict
}
void CsvSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
+// Skip empty records
if (sample.empty()) {
return;
}
- // Build the dict with the keys from the first sample.
+// Build the dict with the keys from the first sample.
if (dict.empty()) {
SampleKeyIndex index = 0;
auto ptr = sample.cbegin();
@@ -93,7 +106,7 @@ JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDi
}
void JsonSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
+// Skip empty records
if (sample.empty()) {
return;
}
@@ -115,7 +128,7 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) {
auto o = sample.at(sampleKey);
if (o) {
- // Make sure that the key is registered in the dictionary
+// Make sure that the key is registered in the dictionary
dict.indexOf(sampleKey->name);
doc[sampleKey->name] = o.get();
}
@@ -130,7 +143,7 @@ KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> strea
}
void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
+// Skip empty records
if (sample.empty()) {
return;
}
@@ -163,7 +176,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
} else {
s << ", ";
}
- // Make sure that the key is registered in the dictionary
+// Make sure that the key is registered in the dictionary
dict.indexOf(sampleKey->name);
s << sampleKey->name << "=" << o.get();
}
@@ -173,7 +186,8 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << endl;
}
-RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) :
+RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields)
+ :
stream(move(stream)), timestamp_key(timestamp_key) {
if (output_fields) {
for (auto field : output_fields.get()->fields) {
@@ -187,7 +201,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDict
}
void RrdSampleOutputStream::write(SampleRecord const &sample) {
- // Skip empty records
+// Skip empty records
if (sample.empty()) {
return;
}
@@ -311,7 +325,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
auto start = s.cbegin();
auto end = s.cend();
- boost::match_results<std::string::const_iterator> what;
+ boost::match_results <std::string::const_iterator> what;
boost::match_flag_type flags = boost::match_default;
SampleRecord sample(dict);
@@ -346,48 +360,6 @@ void AutoSampleParser::process(mutable_buffers_1 buffer) {
}
}
-string to_string(const sample_format_type &arg) {
- if (arg == sample_format_type::AUTO)
- return "auto";
- else if (arg == sample_format_type::CSV)
- return "csv";
- else if (arg == sample_format_type::JSON)
- return "json";
- else if (arg == sample_format_type::KEY_VALUE)
- return "key-value";
- else if (arg == sample_format_type::SQL)
- return "sql";
- else if (arg == sample_format_type::RRD)
- return "rrd";
- else
- return "unknown";
-}
-
-std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
- return os << to_string(type);
-}
-
-std::istream& operator>>(std::istream& is, sample_format_type& type) {
- string s;
- is >> s;
-
- if (s == "auto") {
- type = sample_format_type::AUTO;
- } else if (s == "csv") {
- type = sample_format_type::CSV;
- } else if (s == "key-value") {
- type = sample_format_type::KEY_VALUE;
- } else if (s == "json") {
- type = sample_format_type::JSON;
- } else if (s == "sql") {
- type = sample_format_type::SQL;
- } else if (s == "rrd") {
- type = sample_format_type::RRD;
- }
-
- return is;
-}
-
unique_ptr<SampleStreamParser> open_sample_input_stream(
shared_ptr<SampleOutputStream> output,
KeyDictionary &dict,
@@ -403,7 +375,7 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
template<typename T>
o<T *> find_option(vector<sample_output_stream_option *> &options) {
- for (sample_output_stream_option *& option : options) {
+ for (sample_output_stream_option *&option : options) {
T *x = dynamic_cast<T *>(option);
if (x != nullptr) {
@@ -441,14 +413,6 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
}
}
-ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) {
-}
-
-void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
- std::unique_lock<std::mutex> lock(mutex);
-
- underlying->write(sample);
-}
-
}
}
+} \ No newline at end of file
diff --git a/sensor/main/sensor.cpp b/sensor/main/sensor.cpp
new file mode 100644
index 0000000..a773e0b
--- /dev/null
+++ b/sensor/main/sensor.cpp
@@ -0,0 +1,79 @@
+#include "trygvis/sensor.h"
+
+#include "json.hpp"
+#include <set>
+#include <boost/regex.hpp>
+#include <boost/lexical_cast.hpp>
+#include <chrono>
+
+namespace trygvis {
+namespace sensor {
+
+using namespace std;
+
+string to_string(const sample_format_type &arg) {
+ if (arg == sample_format_type::AUTO)
+ return "auto";
+ else if (arg == sample_format_type::CSV)
+ return "csv";
+ else if (arg == sample_format_type::JSON)
+ return "json";
+ else if (arg == sample_format_type::KEY_VALUE)
+ return "key-value";
+ else if (arg == sample_format_type::SQL)
+ return "sql";
+ else if (arg == sample_format_type::RRD)
+ return "rrd";
+ else
+ return "unknown";
+}
+
+std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
+ return os << to_string(type);
+}
+
+std::istream& operator>>(std::istream& is, sample_format_type& type) {
+ string s;
+ is >> s;
+
+ if (s == "auto") {
+ type = sample_format_type::AUTO;
+ } else if (s == "csv") {
+ type = sample_format_type::CSV;
+ } else if (s == "key-value") {
+ type = sample_format_type::KEY_VALUE;
+ } else if (s == "json") {
+ type = sample_format_type::JSON;
+ } else if (s == "sql") {
+ type = sample_format_type::SQL;
+ } else if (s == "rrd") {
+ type = sample_format_type::RRD;
+ }
+
+ return is;
+}
+
+template<>
+const o<long> SampleRecord::lexical_at(const SampleKey *key) const {
+ auto value = at(key);
+
+ if (!value) {
+ return o<long>();
+ }
+
+ return o<long>(boost::lexical_cast<long>(value.get()));
+}
+//
+//template<class A>
+//const o<A> SampleRecord::lexical_at(const SampleKey *key) const {
+// auto value = at(key);
+//
+// if (!value) {
+// return o<A>();
+// }
+//
+// return o<A>(boost::lexical_cast<A>(value.get()));
+//}
+
+}
+}