aboutsummaryrefslogtreecommitdiff
path: root/sensor/main/io.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main/io.cpp')
-rw-r--r--sensor/main/io.cpp47
1 files changed, 26 insertions, 21 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index cc713b9..e73b9d4 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,6 +1,7 @@
#include "trygvis/sensor/io.h"
#include <map>
+#include <chrono>
#include "json.hpp"
#include "boost/tokenizer.hpp"
#include <boost/algorithm/string.hpp>
@@ -10,6 +11,7 @@ namespace sensor {
namespace io {
using namespace std;
+using namespace std::chrono;
using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;
@@ -24,6 +26,22 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
underlying->write(sample);
}
+
+AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
+ KeyDictionary &dict,
+ const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
+}
+
+void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+ auto time_since_epoch = system_clock::now().time_since_epoch();
+ auto timestamp = duration_cast<seconds>(time_since_epoch).count();
+ auto timestamp_s = std::to_string(timestamp);
+
+ SampleRecord copy = sample;
+ copy.set(timestamp_key, timestamp_s);
+ underlying_->write(copy);
+}
+
void VectorSampleOutputStream::write(SampleRecord const &sample) {
if (sample.empty()) {
return;
@@ -295,7 +313,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) {
// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
-void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
+void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
size_t size = buffer_size(buffer);
@@ -326,7 +344,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
typedef tokenizer<escaped_list_separator<char>> Tokenizer;
Tokenizer tokens(s);
- SampleRecord sample(dict);
+ SampleRecord sample(dict_);
for (auto token : tokens) {
auto index = token.find('=');
@@ -340,7 +358,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
auto value = token.substr(index + 1);
boost::algorithm::trim(value);
- auto key = dict.indexOf(name);
+ auto key = dict_.indexOf(name);
sample.set(key, value);
}
@@ -348,13 +366,13 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
}
AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
+ SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);
type_ = sample_format_type::KEY_VALUE;
}
-void AutoSampleParser::process(mutable_buffers_1 buffer) {
+void AutoSampleParser::process(mutable_buffers_1 &buffer) {
if (parser) {
parser->process(buffer);
} else {
@@ -375,24 +393,11 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser(
}
}
-template<typename T>
-o<T *> find_option(vector<sample_output_stream_option *> &options) {
- for (sample_output_stream_option *&option : options) {
- T *x = dynamic_cast<T *>(option);
-
- if (x != nullptr) {
- return o<T *>(x);
- }
- }
-
- return o<T *>();
-}
-
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
- vector<sample_output_stream_option *> options) {
+ sample_output_stream_options options) {
if (type == sample_format_type::CSV) {
return make_unique<CsvSampleOutputStream>(output, dict);
@@ -401,9 +406,9 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
} else if (type == sample_format_type::JSON) {
return make_unique<JsonSampleOutputStream>(output, dict);
} else if (type == sample_format_type::RRD) {
- o<output_fields_option *> of = find_option<output_fields_option>(options);
+ o<output_fields_option *> of = options.find_option<output_fields_option>();
- o<timestamp_field_option *> tsf = find_option<timestamp_field_option>(options);
+ o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>();
auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");