From 34669098e138d595aadc39fbf8c0cdd004c0916d Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 28 Mar 2015 22:22:16 +0100 Subject: o Adding back SQL output. --- sensor/main/io.cpp | 186 ++++++++++++++++++++++++++++------------------------- 1 file changed, 98 insertions(+), 88 deletions(-) (limited to 'sensor/main') diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index e73b9d4..255b4f5 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -1,7 +1,6 @@ #include "trygvis/sensor/io.h" #include -#include #include "json.hpp" #include "boost/tokenizer.hpp" #include @@ -16,6 +15,52 @@ using boost::tokenizer; using boost::escaped_list_separator; using json = nlohmann::json; +unique_ptr open_sample_stream_parser( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type) { + if (type == sample_format_type::KEY_VALUE) { + return make_unique(output, dict); + } else if (type == sample_format_type::AUTO) { + return make_unique(output, dict); + } else { + throw sample_exception("No parser for format type: " + to_string(type)); + } +} + +unique_ptr open_sample_output_stream( + shared_ptr output, + KeyDictionary &dict, + sample_format_type type, + sample_output_stream_options options) { + + if (type == sample_format_type::CSV) { + return make_unique(output, dict); + } else if (type == sample_format_type::KEY_VALUE) { + return make_unique(output, dict); + } else if (type == sample_format_type::JSON) { + return make_unique(output, dict); + } else if (type == sample_format_type::RRD) { + auto of = options.find_option(); + + auto tsf = options.find_option(); + + auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); + + return make_unique(output, dict, timestamp_key, of); + } else if (type == sample_format_type::SQL) { + auto tno = options.find_option(); + + if (!tno.is_initialized()) { + throw missing_required_option_error("table name"); + } + + return make_unique(move(output), dict, tno.get()->name); + } else { + throw sample_exception("No writer for format type: " + to_string(type)); + } +} + ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) : underlying(move(underlying)) { } @@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptrwrite(sample); + return; + } + auto time_since_epoch = system_clock::now().time_since_epoch(); auto timestamp = duration_cast(time_since_epoch).count(); auto timestamp_s = std::to_string(timestamp); @@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDict dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &values) { - throw sample_exception("deimplemented"); - -// string fs, vs; -// -// fs.reserve(1024); -// vs.reserve(1024); -// -// if (filter_fields) { -// auto i = fields.begin(); -// -// while (i != fields.end()) { -// auto field = *i; -// -// fs += field; -// -// auto value = values.find(field); -// -// if (value != values.end()) { -// vs += "'" + value->second + "'"; -// } else { -// vs += "NULL"; -// } -// -// i++; -// -// if (i != fields.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } else { -// auto i = values.begin(); -// while (i != values.end()) { -// auto v = *i++; -// -// fs += v.first; -// vs += "'" + v.second + "'"; -// -// if (i != values.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } -// -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; +void SqlSampleOutputStream::write(SampleRecord const &sample) { + string fs, vs; + + fs.reserve(1024); + vs.reserve(1024); + + auto &s = *stream.get(); + + bool first = true; + if (!dict.empty()) { + for (auto &key: dict) { + auto sample_key = sample.dict.indexOf(key->name); + + auto value = sample.at(sample_key); + + if (value) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + fs += key->name; + vs += value.get(); + } + } + } else { + for (auto &sample_key: sample.dict) { + auto o = sample.at(sample_key); + + if (o) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + // Make sure that the key is registered in the dictionary + dict.indexOf(sample_key->name); + + fs += sample_key->name; + vs += o.get(); + } + } + } + + s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { @@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) { } } -unique_ptr open_sample_stream_parser( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type) { - if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); - } else if (type == sample_format_type::AUTO) { - return make_unique(output, dict); - } else { - throw sample_exception("No parser for format type: " + to_string(type)); - } -} - -unique_ptr open_sample_output_stream( - shared_ptr output, - KeyDictionary &dict, - sample_format_type type, - sample_output_stream_options options) { - - if (type == sample_format_type::CSV) { - return make_unique(output, dict); - } else if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); - } else if (type == sample_format_type::JSON) { - return make_unique(output, dict); - } else if (type == sample_format_type::RRD) { - o of = options.find_option(); - - o tsf = options.find_option(); - - auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); - - return make_unique(output, dict, timestamp_key, of); -// } else if (type == sample_format_type::SQL) { -// return make_unique(dict, move(output), table_name); - } else { - throw sample_exception("No writer for format type: " + to_string(type)); - } -} - } } } \ No newline at end of file -- cgit v1.2.3