From 8d9760f9bfc8be9b1abad9a8212b14ffd4552fd7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 28 Mar 2015 21:52:46 +0100 Subject: o Adding timestamp by default when converting samples. --- apps/sample-convert.cpp | 21 ++++++++++----- apps/sample-timestamp.cpp | 2 +- sensor/include/trygvis/sensor.h | 2 +- sensor/include/trygvis/sensor/io.h | 55 +++++++++++++++++++++++++++++++------- sensor/main/io.cpp | 47 +++++++++++++++++--------------- sensor/test/SampleTest.cpp | 22 ++++++++++++--- 6 files changed, 107 insertions(+), 42 deletions(-) diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index c7173a1..1cedf42 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -18,6 +18,7 @@ namespace po = boost::program_options; class sample_convert : public app { string fields; string timestamp_field; + bool add_timestamp; public: void add_options(po::options_description_easy_init &options) override { @@ -28,7 +29,8 @@ public: ("output", po::value(&output_file)->default_value("-")) ("output-format", po::value(&output_format)->default_value(sample_format_type::KEY_VALUE)) ("fields", po::value(&fields)) - ("timestamp-field", po::value(×tamp_field)); + ("add-timestamp", po::value(&add_timestamp)->default_value(true)) + ("timestamp-field", po::value(×tamp_field)->default_value("timestamp")); } void add_extra_options(po::options_description &all_options) override { @@ -67,12 +69,10 @@ public: } } - std::vector options; + sample_output_stream_options options; trygvis::sensor::io::timestamp_field_option tf(timestamp_field); - if (!timestamp_field.empty()) { - options.push_back(&tf); - } + options.push_back(&tf); tokenizer<> tok(fields); output_fields_option fs; @@ -81,14 +81,21 @@ public: options.push_back(&fs); } - shared_ptr output = open_sample_output_stream(outputStream, dict, output_format, options); + unique_ptr o = open_sample_output_stream(outputStream, dict, output_format, options); + + if (add_timestamp) { + o = make_unique(move(o), dict, timestamp_field); + } + + shared_ptr output(move(o)); auto input = make_shared(output, dict); char data[100]; while (!inputStream->eof()) { inputStream->get(data[0]); - input->process(boost::asio::buffer(data, 1)); + auto buf = boost::asio::buffer(data, 1); + input->process(buf); } return EXIT_SUCCESS; diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp index 85431ae..cf0b7fe 100644 --- a/apps/sample-timestamp.cpp +++ b/apps/sample-timestamp.cpp @@ -178,7 +178,7 @@ public: return EXIT_FAILURE; } - vector options = {}; + sample_output_stream_options options; unique_ptr unique_output_stream = open_sample_output_stream(shared_ptr(&cout, noop_deleter), dict, parser->type(), options); shared_ptr output_stream{std::move(unique_output_stream)}; shared_ptr p = make_shared(output_stream, dict, timestamp_name, relative_name, relative_resolution, start_time); diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h index 42362b2..4662bab 100644 --- a/sensor/include/trygvis/sensor.h +++ b/sensor/include/trygvis/sensor.h @@ -75,7 +75,7 @@ public: } KeyDictionary(KeyDictionary& that) = delete; - SampleKey *indexOf(const string key) { + SampleKey *indexOf(const string &key) { SampleKeyIndex i = 0; for (auto ptr = keys.cbegin(); ptr != keys.cend(); ptr++, i++) { if ((*ptr)->name == key) { diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index bbeb80e..f92b800 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -37,6 +37,25 @@ public: string name; }; +class sample_output_stream_options : public vector { +public: + ~sample_output_stream_options() { + } + + template + o find_option() const { + for (auto it = begin(); it != end(); ++it) { + T *x = dynamic_cast(*it); + + if (x != nullptr) { + return o(x); + } + } + + return o(); + } +}; + class SampleOutputStream { public: virtual void write(SampleRecord const &sample) = 0; @@ -65,6 +84,20 @@ private: std::mutex mutex; }; +class AddTimestampSampleOutputStream : public SampleOutputStream { +public: + AddTimestampSampleOutputStream(unique_ptr underlying, KeyDictionary &dict, const string ×tamp_name); + + ~AddTimestampSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr underlying_; + const SampleKey* timestamp_key; +}; + class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); @@ -132,16 +165,21 @@ private: class SampleStreamParser { public: // TODO: return number of samples found for progress indication? - virtual void process(mutable_buffers_1 buffer) = 0; + virtual void process(mutable_buffers_1 &buffer) = 0; virtual sample_format_type type() { return type_; } + virtual KeyDictionary &dict() { + return dict_; + } + protected: sample_format_type type_; + KeyDictionary &dict_; - SampleStreamParser(const sample_format_type type) : type_(type) { + SampleStreamParser(const sample_format_type type, KeyDictionary &dict) : type_(type), dict_(dict) { } }; @@ -149,17 +187,16 @@ class KeyValueSampleStreamParser : public SampleStreamParser { public: KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), + SampleStreamParser(sample_format_type::CSV, dict), output(output), line(make_shared>()) { } - void process(mutable_buffers_1 buffer) override; + void process(mutable_buffers_1 &buffer) override; private: void process_line(shared_ptr> packet); static const uint8_t packet_delimiter = '\n'; - KeyDictionary &dict; shared_ptr output; shared_ptr> line; }; @@ -168,11 +205,11 @@ class AutoSampleParser : public SampleStreamParser { public: AutoSampleParser(shared_ptr output, KeyDictionary &dict); + virtual void process(mutable_buffers_1 &buffer) override; + private: unique_ptr parser; unique_ptr keyValueParser; -public: - virtual void process(mutable_buffers_1 buffer); }; unique_ptr open_sample_stream_parser( @@ -184,14 +221,14 @@ unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type, - vector options); + sample_output_stream_options options); static inline unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { - vector options; + sample_output_stream_options options; return open_sample_output_stream(output, dict, type, options); } diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index cc713b9..e73b9d4 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -1,6 +1,7 @@ #include "trygvis/sensor/io.h" #include +#include #include "json.hpp" #include "boost/tokenizer.hpp" #include @@ -10,6 +11,7 @@ namespace sensor { namespace io { using namespace std; +using namespace std::chrono; using boost::tokenizer; using boost::escaped_list_separator; using json = nlohmann::json; @@ -24,6 +26,22 @@ void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { underlying->write(sample); } + +AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr underlying, + KeyDictionary &dict, + const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { +} + +void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { + auto time_since_epoch = system_clock::now().time_since_epoch(); + auto timestamp = duration_cast(time_since_epoch).count(); + auto timestamp_s = std::to_string(timestamp); + + SampleRecord copy = sample; + copy.set(timestamp_key, timestamp_s); + underlying_->write(copy); +} + void VectorSampleOutputStream::write(SampleRecord const &sample) { if (sample.empty()) { return; @@ -295,7 +313,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) { // (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } -void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { +void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { size_t size = buffer_size(buffer); @@ -326,7 +344,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr> packet typedef tokenizer> Tokenizer; Tokenizer tokens(s); - SampleRecord sample(dict); + SampleRecord sample(dict_); for (auto token : tokens) { auto index = token.find('='); @@ -340,7 +358,7 @@ void KeyValueSampleStreamParser::process_line(shared_ptr> packet auto value = token.substr(index + 1); boost::algorithm::trim(value); - auto key = dict.indexOf(name); + auto key = dict_.indexOf(name); sample.set(key, value); } @@ -348,13 +366,13 @@ void KeyValueSampleStreamParser::process_line(shared_ptr> packet } AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { + SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); type_ = sample_format_type::KEY_VALUE; } -void AutoSampleParser::process(mutable_buffers_1 buffer) { +void AutoSampleParser::process(mutable_buffers_1 &buffer) { if (parser) { parser->process(buffer); } else { @@ -375,24 +393,11 @@ unique_ptr open_sample_stream_parser( } } -template -o find_option(vector &options) { - for (sample_output_stream_option *&option : options) { - T *x = dynamic_cast(option); - - if (x != nullptr) { - return o(x); - } - } - - return o(); -} - unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type, - vector options) { + sample_output_stream_options options) { if (type == sample_format_type::CSV) { return make_unique(output, dict); @@ -401,9 +406,9 @@ unique_ptr open_sample_output_stream( } else if (type == sample_format_type::JSON) { return make_unique(output, dict); } else if (type == sample_format_type::RRD) { - o of = find_option(options); + o of = options.find_option(); - o tsf = find_option(options); + o tsf = options.find_option(); auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); diff --git a/sensor/test/SampleTest.cpp b/sensor/test/SampleTest.cpp index 2404500..9737d02 100644 --- a/sensor/test/SampleTest.cpp +++ b/sensor/test/SampleTest.cpp @@ -18,7 +18,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser) { auto parser = make_shared(buffer, dict); char data[] = "a=1, b=2, c=3\n"; - parser->process(boost::asio::buffer(data, sizeof(data))); + auto buf = boost::asio::buffer(data, sizeof(data)); + parser->process(buf); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); BOOST_CHECK_EQUAL(dict.size(), 3); auto it = dict.begin(); @@ -38,7 +39,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser2) { auto parser = make_shared(buffer, dict); char data[] = "now=1,sensor 1=0.000999999833333\n"; - parser->process(boost::asio::buffer(data, sizeof(data))); + auto buf = boost::asio::buffer(data, sizeof(data)); + parser->process(buf); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); SampleRecord& sample = buffer->samples[0]; BOOST_CHECK_EQUAL(dict.size(), 2); @@ -65,7 +67,8 @@ BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) { auto parser = make_shared(buffer, dict); char data[] = "a=1, b=2, c=3\n"; - parser->process(boost::asio::buffer(data, sizeof(data))); + auto buf = boost::asio::buffer(data, sizeof(data)); + parser->process(buf); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); BOOST_CHECK_EQUAL(dict.size(), 3); auto it = dict.begin(); @@ -74,3 +77,16 @@ BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) { BOOST_CHECK_EQUAL((*it)->name, "b"); BOOST_CHECK_EQUAL((*it++)->index, 1); } + +BOOST_AUTO_TEST_CASE(type_detection_key_value) { + KeyDictionary dict; + + auto output = make_shared(); + + auto parser = open_sample_stream_parser(output, dict); + + char data[] = "a=1, b=2, c=3\n"; + auto buf = boost::asio::buffer(data, sizeof(data)); + parser->process(buf); + +} -- cgit v1.2.3