aboutsummaryrefslogtreecommitdiff
path: root/sensor/main
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/main')
-rw-r--r--sensor/main/io.cpp186
1 files changed, 98 insertions, 88 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index e73b9d4..255b4f5 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,7 +1,6 @@
#include "trygvis/sensor/io.h"
#include <map>
-#include <chrono>
#include "json.hpp"
#include "boost/tokenizer.hpp"
#include <boost/algorithm/string.hpp>
@@ -16,6 +15,52 @@ using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;
+unique_ptr<SampleStreamParser> open_sample_stream_parser(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleStreamParser>(output, dict);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output, dict);
+ } else {
+ throw sample_exception("No parser for format type: " + to_string(type));
+ }
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ sample_output_stream_options options) {
+
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::JSON) {
+ return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ auto of = options.find_option<output_fields_option>();
+
+ auto tsf = options.find_option<timestamp_field_option>();
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+ } else if (type == sample_format_type::SQL) {
+ auto tno = options.find_option<table_name_option>();
+
+ if (!tno.is_initialized()) {
+ throw missing_required_option_error("table name");
+ }
+
+ return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name);
+ } else {
+ throw sample_exception("No writer for format type: " + to_string(type));
+ }
+}
+
ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
: underlying(move(underlying)) {
}
@@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<Sample
}
void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+ if (sample.at(timestamp_key)) {
+ underlying_->write(sample);
+ return;
+ }
+
auto time_since_epoch = system_clock::now().time_since_epoch();
auto timestamp = duration_cast<seconds>(time_since_epoch).count();
auto timestamp_s = std::to_string(timestamp);
@@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDict
dict(dict), stream(move(stream)), table_name(table_name) {
}
-void SqlSampleOutputStream::write(SampleRecord const &values) {
- throw sample_exception("deimplemented");
-
-// string fs, vs;
-//
-// fs.reserve(1024);
-// vs.reserve(1024);
-//
-// if (filter_fields) {
-// auto i = fields.begin();
-//
-// while (i != fields.end()) {
-// auto field = *i;
-//
-// fs += field;
-//
-// auto value = values.find(field);
-//
-// if (value != values.end()) {
-// vs += "'" + value->second + "'";
-// } else {
-// vs += "NULL";
-// }
-//
-// i++;
-//
-// if (i != fields.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// } else {
-// auto i = values.begin();
-// while (i != values.end()) {
-// auto v = *i++;
-//
-// fs += v.first;
-// vs += "'" + v.second + "'";
-//
-// if (i != values.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// }
-//
-// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
+void SqlSampleOutputStream::write(SampleRecord const &sample) {
+ string fs, vs;
+
+ fs.reserve(1024);
+ vs.reserve(1024);
+
+ auto &s = *stream.get();
+
+ bool first = true;
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sample_key = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sample_key);
+
+ if (value) {
+ if (first) {
+ first = false;
+ } else {
+ fs += ", ";
+ vs += ", ";
+ }
+ fs += key->name;
+ vs += value.get();
+ }
+ }
+ } else {
+ for (auto &sample_key: sample.dict) {
+ auto o = sample.at(sample_key);
+
+ if (o) {
+ if (first) {
+ first = false;
+ } else {
+ fs += ", ";
+ vs += ", ";
+ }
+ // Make sure that the key is registered in the dictionary
+ dict.indexOf(sample_key->name);
+
+ fs += sample_key->name;
+ vs += o.get();
+ }
+ }
+ }
+
+ s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
@@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) {
}
}
-unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
- KeyDictionary &dict,
- sample_format_type type) {
- if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleStreamParser>(output, dict);
- } else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(output, dict);
- } else {
- throw sample_exception("No parser for format type: " + to_string(type));
- }
-}
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type,
- sample_output_stream_options options) {
-
- if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::RRD) {
- o<output_fields_option *> of = options.find_option<output_fields_option>();
-
- o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>();
-
- auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
-
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
-// } else if (type == sample_format_type::SQL) {
-// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
- } else {
- throw sample_exception("No writer for format type: " + to_string(type));
- }
-}
-
}
}
} \ No newline at end of file