From 650fb016ce36cfda2e8073764196655ee6a50567 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 11 Jul 2015 14:30:14 +0200 Subject: o Adding a parser::finish() method that indicates that the stream is done and any possibly buffered data should be parsed and processed. --- sensor/include/trygvis/sensor/io.h | 15 +++++++++----- sensor/main/io.cpp | 40 +++++++++++++++++++++++++++++--------- sensor/test/CMakeLists.txt | 6 ++++-- sensor/test/SampleTest.cpp | 33 ++++++++++++++++++++++++++++++- 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index f86f2d9..b69bd7a 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -206,8 +206,9 @@ private: class SampleStreamParser { public: - // TODO: return number of samples found for progress indication? - virtual void process(mutable_buffers_1 &buffer) = 0; + virtual int process(mutable_buffers_1 &buffer) = 0; + + virtual int finish() = 0; virtual sample_format_type type() { return type_; @@ -233,10 +234,12 @@ public: line(make_shared>()) { } - void process(mutable_buffers_1 &buffer) override; + int process(mutable_buffers_1 &buffer) override; + + int finish() override; private: - void process_line(shared_ptr> packet); + void process_line(shared_ptr> &packet); static const uint8_t packet_delimiter = '\n'; shared_ptr output; @@ -247,7 +250,9 @@ class AutoSampleParser : public SampleStreamParser { public: AutoSampleParser(shared_ptr output, KeyDictionary &dict); - virtual void process(mutable_buffers_1 &buffer) override; + virtual int process(mutable_buffers_1 &buffer) override; + + virtual int finish() override; private: unique_ptr parser; diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index fa15c96..a37844a 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -363,33 +363,47 @@ void SqlSampleOutputStream::write(SampleRecord const &sample) { s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } -void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { +int KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { size_t size = buffer_size(buffer); - if (size == 0 && line->size()) { - process_line(line); - line = make_shared>(); - return; + if (size == 0) { + return 0; } auto data = boost::asio::buffer_cast(buffer); + int count = 0; for (int i = 0; i < size; i++) { uint8_t b = data[i]; - if (b == packet_delimiter) { + if (b == '\0') { + continue; + } else if (b == packet_delimiter) { process_line(line); + count++; line = make_shared>(); } else { line->push_back(b); } } + return count; } -void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { +int KeyValueSampleStreamParser::finish() { + if (line->size()) { + process_line(line); + + return 1; + } + + return 0; +} + +void KeyValueSampleStreamParser::process_line(shared_ptr> &packet) { auto s = std::string((char *) packet->data(), packet->size()); +// boost::algorithm::erase_all(s, "\0"); typedef tokenizer> Tokenizer; Tokenizer tokens(s); @@ -422,9 +436,17 @@ AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDic type_ = sample_format_type::KEY_VALUE; } -void AutoSampleParser::process(mutable_buffers_1 &buffer) { +int AutoSampleParser::process(mutable_buffers_1 &buffer) { + if (parser) { + return parser->process(buffer); + } else { + throw runtime_error("Not implemented yet"); + } +} + +int AutoSampleParser::finish() { if (parser) { - parser->process(buffer); + return parser->finish(); } else { throw runtime_error("Not implemented yet"); } diff --git a/sensor/test/CMakeLists.txt b/sensor/test/CMakeLists.txt index 2e68532..5c2d527 100644 --- a/sensor/test/CMakeLists.txt +++ b/sensor/test/CMakeLists.txt @@ -4,6 +4,8 @@ find_package(Boost COMPONENTS log regex unit_test_framework REQUIRED) file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *Test.cpp) add_definitions(-DBOOST_TEST_DYN_LINK) +enable_testing() + foreach(testSrc ${TEST_SRCS}) get_filename_component(testName ${testSrc} NAME_WE) @@ -21,6 +23,6 @@ foreach(testSrc ${TEST_SRCS}) #Finally add it to test execution - #Notice the WORKING_DIRECTORY and COMMAND add_test(NAME ${testName} - WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/testBin - COMMAND ${CMAKE_BINARY_DIR}/testBin/${testName}) + COMMAND ${testName}) + endforeach(testSrc) diff --git a/sensor/test/SampleTest.cpp b/sensor/test/SampleTest.cpp index 9737d02..8479e75 100644 --- a/sensor/test/SampleTest.cpp +++ b/sensor/test/SampleTest.cpp @@ -20,6 +20,7 @@ BOOST_AUTO_TEST_CASE(key_value_parser) { char data[] = "a=1, b=2, c=3\n"; auto buf = boost::asio::buffer(data, sizeof(data)); parser->process(buf); + parser->finish(); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); BOOST_CHECK_EQUAL(dict.size(), 3); auto it = dict.begin(); @@ -41,9 +42,11 @@ BOOST_AUTO_TEST_CASE(key_value_parser2) { char data[] = "now=1,sensor 1=0.000999999833333\n"; auto buf = boost::asio::buffer(data, sizeof(data)); parser->process(buf); + parser->finish(); + BOOST_CHECK_EQUAL(dict.size(), 2); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); + SampleRecord& sample = buffer->samples[0]; - BOOST_CHECK_EQUAL(dict.size(), 2); auto it = dict.begin(); BOOST_CHECK_EQUAL((*it)->name, "now"); BOOST_CHECK_EQUAL(!sample.at(*it).operator!(), true); @@ -69,6 +72,7 @@ BOOST_AUTO_TEST_CASE(key_value_parser_with_custom_dict) { char data[] = "a=1, b=2, c=3\n"; auto buf = boost::asio::buffer(data, sizeof(data)); parser->process(buf); + parser->finish(); BOOST_CHECK_EQUAL(buffer->samples.size(), 1); BOOST_CHECK_EQUAL(dict.size(), 3); auto it = dict.begin(); @@ -88,5 +92,32 @@ BOOST_AUTO_TEST_CASE(type_detection_key_value) { char data[] = "a=1, b=2, c=3\n"; auto buf = boost::asio::buffer(data, sizeof(data)); parser->process(buf); + parser->finish(); +} + +BOOST_AUTO_TEST_CASE(key_value_parser_without_newline) { + KeyDictionary dict; + + auto buffer = make_shared(); + + auto parser = make_shared(buffer, dict); + + char data[] = "now=1,sensor=123"; + auto buf = boost::asio::buffer(data, sizeof(data)); + parser->process(buf); + parser->finish(); + BOOST_CHECK_EQUAL(dict.size(), 2); + BOOST_CHECK_EQUAL(buffer->samples.size(), 1); + + SampleRecord& sample = buffer->samples[0]; + auto it = dict.begin(); + BOOST_CHECK_EQUAL((*it)->name, "now"); + BOOST_CHECK_EQUAL(!sample.at(*it).operator!(), true); + BOOST_CHECK_EQUAL(sample.at(*it).get(), "1"); + BOOST_CHECK_EQUAL((*it++)->index, 0); + BOOST_CHECK_EQUAL((*it)->name, "sensor"); + BOOST_CHECK_EQUAL(!sample.at(*it).operator!(), true); + BOOST_CHECK_EQUAL(sample.at(*it).get(), "123"); + BOOST_CHECK_EQUAL((*it++)->index, 1); } -- cgit v1.2.3