diff options
Diffstat (limited to 'sensor/main')
-rw-r--r-- | sensor/main/io.cpp | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index fa15c96..a37844a 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -363,33 +363,47 @@ void SqlSampleOutputStream::write(SampleRecord const &sample) { s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } -void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { +int KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { size_t size = buffer_size(buffer); - if (size == 0 && line->size()) { - process_line(line); - line = make_shared<vector<uint8_t>>(); - return; + if (size == 0) { + return 0; } auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); + int count = 0; for (int i = 0; i < size; i++) { uint8_t b = data[i]; - if (b == packet_delimiter) { + if (b == '\0') { + continue; + } else if (b == packet_delimiter) { process_line(line); + count++; line = make_shared<vector<uint8_t>>(); } else { line->push_back(b); } } + return count; } -void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) { +int KeyValueSampleStreamParser::finish() { + if (line->size()) { + process_line(line); + + return 1; + } + + return 0; +} + +void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packet) { auto s = std::string((char *) packet->data(), packet->size()); +// boost::algorithm::erase_all(s, "\0"); typedef tokenizer<escaped_list_separator<char>> Tokenizer; Tokenizer tokens(s); @@ -422,9 +436,17 @@ AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDic type_ = sample_format_type::KEY_VALUE; } -void AutoSampleParser::process(mutable_buffers_1 &buffer) { +int AutoSampleParser::process(mutable_buffers_1 &buffer) { + if (parser) { + return parser->process(buffer); + } else { + throw runtime_error("Not implemented yet"); + } +} + +int AutoSampleParser::finish() { if (parser) { - parser->process(buffer); + return parser->finish(); } else { throw runtime_error("Not implemented yet"); } |