aboutsummaryrefslogtreecommitdiff
path: root/sensor/main/io.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main/io.cpp')
-rw-r--r--sensor/main/io.cpp61
1 files changed, 30 insertions, 31 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 57c0b18..cc713b9 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,17 +1,17 @@
#include "trygvis/sensor/io.h"
-#include <ostream>
-#include <vector>
#include <map>
-#include <mutex>
#include "json.hpp"
-#include "boost/regex.hpp"
+#include "boost/tokenizer.hpp"
+#include <boost/algorithm/string.hpp>
namespace trygvis {
namespace sensor {
namespace io {
using namespace std;
+using boost::tokenizer;
+using boost::escaped_list_separator;
using json = nlohmann::json;
ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
@@ -37,12 +37,12 @@ CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDict
}
void CsvSampleOutputStream::write(SampleRecord const &sample) {
-// Skip empty records
+ // Skip empty records
if (sample.empty()) {
return;
}
-// Build the dict with the keys from the first sample.
+ // Build the dict with the keys from the first sample.
if (dict.empty()) {
SampleKeyIndex index = 0;
auto ptr = sample.cbegin();
@@ -81,7 +81,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) {
}
}
- s << endl;
+ s << endl << flush;
}
void CsvSampleOutputStream::writeHeader() {
@@ -98,7 +98,7 @@ void CsvSampleOutputStream::writeHeader() {
}
}
- s << endl;
+ s << endl << flush;
}
JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
@@ -106,7 +106,7 @@ JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDi
}
void JsonSampleOutputStream::write(SampleRecord const &sample) {
-// Skip empty records
+ // Skip empty records
if (sample.empty()) {
return;
}
@@ -128,14 +128,14 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) {
auto o = sample.at(sampleKey);
if (o) {
-// Make sure that the key is registered in the dictionary
+ // Make sure that the key is registered in the dictionary
dict.indexOf(sampleKey->name);
doc[sampleKey->name] = o.get();
}
}
}
- *stream.get() << doc << endl;
+ *stream.get() << doc << endl << flush;
}
KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
@@ -143,7 +143,7 @@ KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> strea
}
void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
-// Skip empty records
+ // Skip empty records
if (sample.empty()) {
return;
}
@@ -176,14 +176,14 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
} else {
s << ", ";
}
-// Make sure that the key is registered in the dictionary
+ // Make sure that the key is registered in the dictionary
dict.indexOf(sampleKey->name);
s << sampleKey->name << "=" << o.get();
}
}
}
- *stream.get() << endl;
+ s << endl << flush;
}
RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
@@ -204,7 +204,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
}
void RrdSampleOutputStream::write(SampleRecord const &sample) {
-// Skip empty records
+ // Skip empty records
if (sample.empty()) {
return;
}
@@ -239,7 +239,7 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) {
s << (value ? value.get() : "U");
}
- *stream.get() << endl;
+ s << endl << flush;
}
SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
@@ -292,7 +292,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) {
// }
// }
//
-// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
@@ -321,28 +321,27 @@ void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
}
void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
- auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
auto s = std::string((char *) packet->data(), packet->size());
- static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)");
-
- auto start = s.cbegin();
- auto end = s.cend();
- boost::match_results <std::string::const_iterator> what;
- boost::match_flag_type flags = boost::match_default;
+ typedef tokenizer<escaped_list_separator<char>> Tokenizer;
+ Tokenizer tokens(s);
SampleRecord sample(dict);
- while (regex_search(start, end, what, e, flags)) {
- auto name = static_cast<string>(what[1]);
- auto value = static_cast<string>(what[2]);
- start = what[0].second;
+ for (auto token : tokens) {
+ auto index = token.find('=');
+
+ if (index == string::npos) {
+ continue;
+ }
+
+ auto name = token.substr(0, index);
+ boost::algorithm::trim(name);
+ auto value = token.substr(index + 1);
+ boost::algorithm::trim(value);
auto key = dict.indexOf(name);
sample.set(key, value);
-
- flags |= boost::match_prev_avail;
- flags |= boost::match_not_bob;
}
output->write(sample);