From f110a0912efb7245d6d548aa8a5ac0c89bcd9dc0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 23 Mar 2015 00:14:56 +0100 Subject: o Replacing regex based parsing with simpler and more correct tokenizing. o Flushing output after each sample. o Adding back tests. --- sensor/main/io.cpp | 61 +++++++++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 31 deletions(-) (limited to 'sensor/main') diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 57c0b18..cc713b9 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -1,17 +1,17 @@ #include "trygvis/sensor/io.h" -#include -#include #include -#include #include "json.hpp" -#include "boost/regex.hpp" +#include "boost/tokenizer.hpp" +#include namespace trygvis { namespace sensor { namespace io { using namespace std; +using boost::tokenizer; +using boost::escaped_list_separator; using json = nlohmann::json; ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) @@ -37,12 +37,12 @@ CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDict } void CsvSampleOutputStream::write(SampleRecord const &sample) { -// Skip empty records + // Skip empty records if (sample.empty()) { return; } -// Build the dict with the keys from the first sample. + // Build the dict with the keys from the first sample. if (dict.empty()) { SampleKeyIndex index = 0; auto ptr = sample.cbegin(); @@ -81,7 +81,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) { } } - s << endl; + s << endl << flush; } void CsvSampleOutputStream::writeHeader() { @@ -98,7 +98,7 @@ void CsvSampleOutputStream::writeHeader() { } } - s << endl; + s << endl << flush; } JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : @@ -106,7 +106,7 @@ JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDi } void JsonSampleOutputStream::write(SampleRecord const &sample) { -// Skip empty records + // Skip empty records if (sample.empty()) { return; } @@ -128,14 +128,14 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) { auto o = sample.at(sampleKey); if (o) { -// Make sure that the key is registered in the dictionary + // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); doc[sampleKey->name] = o.get(); } } } - *stream.get() << doc << endl; + *stream.get() << doc << endl << flush; } KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : @@ -143,7 +143,7 @@ KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr strea } void KeyValueSampleOutputStream::write(SampleRecord const &sample) { -// Skip empty records + // Skip empty records if (sample.empty()) { return; } @@ -176,14 +176,14 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { } else { s << ", "; } -// Make sure that the key is registered in the dictionary + // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); s << sampleKey->name << "=" << o.get(); } } } - *stream.get() << endl; + s << endl << flush; } RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, @@ -204,7 +204,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr stream, } void RrdSampleOutputStream::write(SampleRecord const &sample) { -// Skip empty records + // Skip empty records if (sample.empty()) { return; } @@ -239,7 +239,7 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) { s << (value ? value.get() : "U"); } - *stream.get() << endl; + s << endl << flush; } SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : @@ -292,7 +292,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) { // } // } // -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; +// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { @@ -321,28 +321,27 @@ void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { } void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { - auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); - static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); - - auto start = s.cbegin(); - auto end = s.cend(); - boost::match_results what; - boost::match_flag_type flags = boost::match_default; + typedef tokenizer> Tokenizer; + Tokenizer tokens(s); SampleRecord sample(dict); - while (regex_search(start, end, what, e, flags)) { - auto name = static_cast(what[1]); - auto value = static_cast(what[2]); - start = what[0].second; + for (auto token : tokens) { + auto index = token.find('='); + + if (index == string::npos) { + continue; + } + + auto name = token.substr(0, index); + boost::algorithm::trim(name); + auto value = token.substr(index + 1); + boost::algorithm::trim(value); auto key = dict.indexOf(name); sample.set(key, value); - - flags |= boost::match_prev_avail; - flags |= boost::match_not_bob; } output->write(sample); -- cgit v1.2.3