aboutsummaryrefslogtreecommitdiff
path: root/sensor/main/io.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main/io.cpp')
-rw-r--r--sensor/main/io.cpp40
1 files changed, 31 insertions, 9 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index fa15c96..a37844a 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -363,33 +363,47 @@ void SqlSampleOutputStream::write(SampleRecord const &sample) {
s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
-void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
+int KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
size_t size = buffer_size(buffer);
- if (size == 0 && line->size()) {
- process_line(line);
- line = make_shared<vector<uint8_t>>();
- return;
+ if (size == 0) {
+ return 0;
}
auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
+ int count = 0;
for (int i = 0; i < size; i++) {
uint8_t b = data[i];
- if (b == packet_delimiter) {
+ if (b == '\0') {
+ continue;
+ } else if (b == packet_delimiter) {
process_line(line);
+ count++;
line = make_shared<vector<uint8_t>>();
} else {
line->push_back(b);
}
}
+ return count;
}
-void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+int KeyValueSampleStreamParser::finish() {
+ if (line->size()) {
+ process_line(line);
+
+ return 1;
+ }
+
+ return 0;
+}
+
+void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packet) {
auto s = std::string((char *) packet->data(), packet->size());
+// boost::algorithm::erase_all(s, "\0");
typedef tokenizer<escaped_list_separator<char>> Tokenizer;
Tokenizer tokens(s);
@@ -422,9 +436,17 @@ AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDic
type_ = sample_format_type::KEY_VALUE;
}
-void AutoSampleParser::process(mutable_buffers_1 &buffer) {
+int AutoSampleParser::process(mutable_buffers_1 &buffer) {
+ if (parser) {
+ return parser->process(buffer);
+ } else {
+ throw runtime_error("Not implemented yet");
+ }
+}
+
+int AutoSampleParser::finish() {
if (parser) {
- parser->process(buffer);
+ return parser->finish();
} else {
throw runtime_error("Not implemented yet");
}