aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/sample-convert.cpp21
-rw-r--r--apps/sample-timestamp.cpp2
-rw-r--r--sensor/include/trygvis/sensor.h2
-rw-r--r--sensor/include/trygvis/sensor/io.h55
-rw-r--r--sensor/main/io.cpp47
-rw-r--r--sensor/test/SampleTest.cpp22
6 files changed, 107 insertions, 42 deletions
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index c7173a1..1cedf42 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -18,6 +18,7 @@ namespace po = boost::program_options;
class sample_convert : public app {
string fields;
string timestamp_field;
+ bool add_timestamp;
public:
void add_options(po::options_description_easy_init &options) override {
@@ -28,7 +29,8 @@ public:
("output", po::value<string>(&output_file)->default_value("-"))
("output-format", po::value<sample_format_type>(&output_format)->default_value(sample_format_type::KEY_VALUE))
("fields", po::value<string>(&fields))
- ("timestamp-field", po::value<string>(&timestamp_field));
+ ("add-timestamp", po::value<bool>(&add_timestamp)->default_value(true))
+ ("timestamp-field", po::value<string>(&timestamp_field)->default_value("timestamp"));
}
void add_extra_options(po::options_description &all_options) override {
@@ -67,12 +69,10 @@ public:
}
}
- std::vector<sample_output_stream_option *> options;
+ sample_output_stream_options options;
trygvis::sensor::io::timestamp_field_option tf(timestamp_field);
- if (!timestamp_field.empty()) {
- options.push_back(&tf);
- }
+ options.push_back(&tf);
tokenizer<> tok(fields);
output_fields_option fs;
@@ -81,14 +81,21 @@ public:
options.push_back(&fs);
}
- shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, output_format, options);
+ unique_ptr<SampleOutputStream> o = open_sample_output_stream(outputStream, dict, output_format, options);
+
+ if (add_timestamp) {
+ o = make_unique<AddTimestampSampleOutputStream>(move(o), dict, timestamp_field);
+ }
+
+ shared_ptr<SampleOutputStream> output(move(o));
auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
char data[100];
while (!inputStream->eof()) {
inputStream->get(data[0]);
- input->process(boost::asio::buffer(data, 1));
+ auto buf = boost::asio::buffer(data, 1);
+ input->process(buf);
}
return EXIT_SUCCESS;
diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp
index 85431ae..cf0b7fe 100644
--- a/apps/sample-timestamp.cpp
+++ b/apps/sample-timestamp.cpp
@@ -178,7 +178,7 @@ public:
return EXIT_FAILURE;
}
- vector<sample_output_stream_option*> options = {};
+ sample_output_stream_options options;
unique_ptr<SampleOutputStream> unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, parser->type(), options);
shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)};
shared_ptr<SampleOutputStream> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict, timestamp_name, relative_name, relative_resolution, start_time);
diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h
index 42362b2..4662bab 100644
--- a/sensor/include/trygvis/sensor.h
+++ b/sensor/include/trygvis/sensor.h
@@ -75,7 +75,7 @@ public:
}
KeyDictionary(KeyDictionary& that) = delete;
- SampleKey *indexOf(const string key) {
+ SampleKey *indexOf(const string &key) {
SampleKeyIndex i = 0;
for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) {
if ((*ptr)->name == key) {
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index bbeb80e..f92b800 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -37,6 +37,25 @@ public:
string name;
};
+class sample_output_stream_options : public vector<sample_output_stream_option *> {
+public:
+ ~sample_output_stream_options() {
+ }
+
+ template<typename T>
+ o<T *> find_option() const {
+ for (auto it = begin(); it != end(); ++it) {
+ T *x = dynamic_cast<T *>(*it);
+
+ if (x != nullptr) {
+ return o<T *>(x);
+ }
+ }
+
+ return o<T *>();
+ }
+};
+
class SampleOutputStream {
public:
virtual void write(SampleRecord const &sample) = 0;
@@ -65,6 +84,20 @@ private:
std::mutex mutex;
};
+class AddTimestampSampleOutputStream : public SampleOutputStream {
+public:
+ AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string &timestamp_name);
+
+ ~AddTimestampSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying_;
+ const SampleKey* timestamp_key;
+};
+
class CsvSampleOutputStream : public SampleOutputStream {
public:
CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
@@ -132,16 +165,21 @@ private:
class SampleStreamParser {
public:
// TODO: return number of samples found for progress indication?
- virtual void process(mutable_buffers_1 buffer) = 0;
+ virtual void process(mutable_buffers_1 &buffer) = 0;
virtual sample_format_type type() {
return type_;
}
+ virtual KeyDictionary &dict() {
+ return dict_;
+ }
+
protected:
sample_format_type type_;
+ KeyDictionary &dict_;
- SampleStreamParser(const sample_format_type type) : type_(type) {
+ SampleStreamParser(const sample_format_type type, KeyDictionary &dict) : type_(type), dict_(dict) {
}
};
@@ -149,17 +187,16 @@ class KeyValueSampleStreamParser : public SampleStreamParser {
public:
KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
+ SampleStreamParser(sample_format_type::CSV, dict), output(output),
line(make_shared<vector<uint8_t>>()) {
}
- void process(mutable_buffers_1 buffer) override;
+ void process(mutable_buffers_1 &buffer) override;
private:
void process_line(shared_ptr<vector<uint8_t>> packet);
static const uint8_t packet_delimiter = '\n';
- KeyDictionary &dict;
shared_ptr<SampleOutputStream> output;
shared_ptr<vector<uint8_t>> line;
};
@@ -168,11 +205,11 @@ class AutoSampleParser : public SampleStreamParser {
public:
AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+ virtual void process(mutable_buffers_1 &buffer) override;
+
private:
unique_ptr<SampleStreamParser> parser;
unique_ptr<KeyValueSampleStreamParser> keyValueParser;
-public:
- virtual void process(mutable_buffers_1 buffer);
};
unique_ptr<SampleStreamParser> open_sample_stream_parser(
@@ -184,14 +221,14 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
- vector<sample_output_stream_option *> options);
+ sample_output_stream_options options);
static inline
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type) {
- vector<sample_output_stream_option *> options;
+ sample_output_stream_options options;
return open_sample_output_stream(output, dict, type, options);
}
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index cc713b9..e73b9d4 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,6 +1,7 @@
#include "trygvis/sensor/io.h"
#include <map>
+#include <chrono>
#include "json.hpp"
#include "boost/tokenizer.hpp"
#include <boost/algorithm/string.hpp>
@@ -10,6 +11,7 @@ namespace sensor {
namespace io {
using namespace std;
+using namespace std::chrono;
using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;
@@ -24,6 +26,22 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
underlying->write(sample);
}
+
+AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
+ KeyDictionary &dict,
+ const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
+}
+
+void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+ auto time_since_epoch = system_clock::now().time_since_epoch();
+ auto timestamp = duration_cast<seconds>(time_since_epoch).count();
+ auto timestamp_s = std::to_string(timestamp);
+
+ SampleRecord copy = sample;
+ copy.set(timestamp_key, timestamp_s);
+ underlying_->write(copy);
+}
+
void VectorSampleOutputStream::write(SampleRecord const &sample) {
if (sample.empty()) {
return;
@@ -295,7 +313,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) {
// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
-void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
+void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
size_t size = buffer_size(buffer);
@@ -326,7 +344,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
typedef tokenizer<escaped_list_separator<char>> Tokenizer;
Tokenizer tokens(s);
- SampleRecord sample(dict);
+ SampleRecord sample(dict_);
for (auto token : tokens) {
auto index = token.find('=');
@@ -340,7 +358,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
auto value = token.substr(index + 1);
boost::algorithm::trim(value);
- auto key = dict.indexOf(name);
+ auto key = dict_.indexOf(name);
sample.set(key, value);
}
@@ -348,13 +366,13 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet
}
AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
+ SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);
type_ = sample_format_type::KEY_VALUE;
}
-void AutoSampleParser::process(mutable_buffers_1 buffer) {
+void AutoSampleParser::process(mutable_buffers_1 &buffer) {
if (parser) {
parser->process(buffer);
} else {
@@ -375,24 +393,11 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser(
}
}
-template<typename T>
-o<T *> find_option(vector<sample_output_stream_option *> &options) {
- for (sample_output_stream_option *&option : options) {
- T *x = dynamic_cast<T *>(option);
-
- if (x != nullptr) {
- return o<T *>(x);
- }
- }
-
- return o<T *>();
-}
-
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
- vector<sample_output_stream_option *> options) {
+ sample_output_stream_options options) {
if (type == sample_format_type::CSV) {
return make_unique<CsvSampleOutputStream>(output, dict);
@@ -401,9 +406,9 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
} else if (type == sample_format_type::JSON) {
return make_unique<JsonSampleOutputStream>(output, dict);
} else if (type == sample_format_type::RRD) {
- o<output_fields_option *> of = find_option<output_fields_option>(options);
+ o<output_fields_option *> of = options.find_option<output_fields_option>();
- o<timestamp_field_option *> tsf = find_option<timestamp_field_option>(options);
+ o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>();
auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
diff --git a/sensor/test/SampleTest.cpp b/sensor/test/SampleTest.cpp
index 2404500..9737d02 100644
--- a/sensor/test/SampleTest.cpp
+++ b/sensor/test/SampleTest.cpp
@@ -18,7 +18,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser) {
auto parser = make_shared<KeyValueSampleStreamParser>(buffer, dict);
char data[] = "a=1, b=2, c=3\n";
- parser->process(boost::asio::buffer(data, sizeof(data)));
+ auto buf = boost::asio::buffer(data, sizeof(data));
+ parser->process(buf);
BOOST_CHECK_EQUAL(buffer->samples.size(), 1);
BOOST_CHECK_EQUAL(dict.size(), 3);
auto it = dict.begin();
@@ -38,7 +39,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser2) {
auto parser = make_shared<KeyValueSampleStreamParser>(buffer, dict);
char data[] = "now=1,sensor 1=0.000999999833333\n";
- parser->process(boost::asio::buffer(data, sizeof(data)));
+ auto buf = boost::asio::buffer(data, sizeof(data));
+ parser->process(buf);
BOOST_CHECK_EQUAL(buffer->samples.size(), 1);
SampleRecord& sample = buffer->samples[0];
BOOST_CHECK_EQUAL(dict.size(), 2);
@@ -65,7 +67,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) {
auto parser = make_shared<KeyValueSampleStreamParser>(buffer, dict);
char data[] = "a=1, b=2, c=3\n";
- parser->process(boost::asio::buffer(data, sizeof(data)));
+ auto buf = boost::asio::buffer(data, sizeof(data));
+ parser->process(buf);
BOOST_CHECK_EQUAL(buffer->samples.size(), 1);
BOOST_CHECK_EQUAL(dict.size(), 3);
auto it = dict.begin();
@@ -74,3 +77,16 @@ BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) {
BOOST_CHECK_EQUAL((*it)->name, "b");
BOOST_CHECK_EQUAL((*it++)->index, 1);
}
+
+BOOST_AUTO_TEST_CASE(type_detection_key_value) {
+ KeyDictionary dict;
+
+ auto output = make_shared<VectorSampleOutputStream>();
+
+ auto parser = open_sample_stream_parser(output, dict);
+
+ char data[] = "a=1, b=2, c=3\n";
+ auto buf = boost::asio::buffer(data, sizeof(data));
+ parser->process(buf);
+
+}