diff options
Diffstat (limited to 'sensor/main')
-rw-r--r-- | sensor/main/io.cpp | 186 |
1 files changed, 98 insertions, 88 deletions
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index e73b9d4..255b4f5 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -1,7 +1,6 @@ #include "trygvis/sensor/io.h" #include <map> -#include <chrono> #include "json.hpp" #include "boost/tokenizer.hpp" #include <boost/algorithm/string.hpp> @@ -16,6 +15,52 @@ using boost::tokenizer; using boost::escaped_list_separator; using json = nlohmann::json; +unique_ptr<SampleStreamParser> open_sample_stream_parser( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type) { + if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleStreamParser>(output, dict); + } else if (type == sample_format_type::AUTO) { + return make_unique<AutoSampleParser>(output, dict); + } else { + throw sample_exception("No parser for format type: " + to_string(type)); + } +} + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, + sample_output_stream_options options) { + + if (type == sample_format_type::CSV) { + return make_unique<CsvSampleOutputStream>(output, dict); + } else if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleOutputStream>(output, dict); + } else if (type == sample_format_type::JSON) { + return make_unique<JsonSampleOutputStream>(output, dict); + } else if (type == sample_format_type::RRD) { + auto of = options.find_option<output_fields_option>(); + + auto tsf = options.find_option<timestamp_field_option>(); + + auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); + + return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); + } else if (type == sample_format_type::SQL) { + auto tno = options.find_option<table_name_option>(); + + if (!tno.is_initialized()) { + throw missing_required_option_error("table name"); + } + + return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name); + } else { + throw sample_exception("No writer for format type: " + to_string(type)); + } +} + ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) { } @@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<Sample } void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { + if (sample.at(timestamp_key)) { + underlying_->write(sample); + return; + } + auto time_since_epoch = system_clock::now().time_since_epoch(); auto timestamp = duration_cast<seconds>(time_since_epoch).count(); auto timestamp_s = std::to_string(timestamp); @@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDict dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &values) { - throw sample_exception("deimplemented"); - -// string fs, vs; -// -// fs.reserve(1024); -// vs.reserve(1024); -// -// if (filter_fields) { -// auto i = fields.begin(); -// -// while (i != fields.end()) { -// auto field = *i; -// -// fs += field; -// -// auto value = values.find(field); -// -// if (value != values.end()) { -// vs += "'" + value->second + "'"; -// } else { -// vs += "NULL"; -// } -// -// i++; -// -// if (i != fields.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } else { -// auto i = values.begin(); -// while (i != values.end()) { -// auto v = *i++; -// -// fs += v.first; -// vs += "'" + v.second + "'"; -// -// if (i != values.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } -// -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; +void SqlSampleOutputStream::write(SampleRecord const &sample) { + string fs, vs; + + fs.reserve(1024); + vs.reserve(1024); + + auto &s = *stream.get(); + + bool first = true; + if (!dict.empty()) { + for (auto &key: dict) { + auto sample_key = sample.dict.indexOf(key->name); + + auto value = sample.at(sample_key); + + if (value) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + fs += key->name; + vs += value.get(); + } + } + } else { + for (auto &sample_key: sample.dict) { + auto o = sample.at(sample_key); + + if (o) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + // Make sure that the key is registered in the dictionary + dict.indexOf(sample_key->name); + + fs += sample_key->name; + vs += o.get(); + } + } + } + + s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { @@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) { } } -unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type) { - if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleStreamParser>(output, dict); - } else if (type == sample_format_type::AUTO) { - return make_unique<AutoSampleParser>(output, dict); - } else { - throw sample_exception("No parser for format type: " + to_string(type)); - } -} - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - sample_output_stream_options options) { - - if (type == sample_format_type::CSV) { - return make_unique<CsvSampleOutputStream>(output, dict); - } else if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleOutputStream>(output, dict); - } else if (type == sample_format_type::JSON) { - return make_unique<JsonSampleOutputStream>(output, dict); - } else if (type == sample_format_type::RRD) { - o<output_fields_option *> of = options.find_option<output_fields_option>(); - - o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>(); - - auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); - - return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); -// } else if (type == sample_format_type::SQL) { -// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name); - } else { - throw sample_exception("No writer for format type: " + to_string(type)); - } -} - } } }
\ No newline at end of file |