diff options
-rw-r--r-- | apps/sample-convert.cpp | 16 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor.h | 3 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 86 | ||||
-rw-r--r-- | sensor/main/io.cpp | 186 |
4 files changed, 165 insertions, 126 deletions
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index 1cedf42..baa1447 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -16,9 +16,14 @@ using boost::tokenizer; namespace po = boost::program_options; class sample_convert : public app { +private: string fields; string timestamp_field; bool add_timestamp; + string input_file, output_file; + sample_format_type output_format; + + string table_name; public: void add_options(po::options_description_easy_init &options) override { @@ -74,6 +79,11 @@ public: options.push_back(&tf); + table_name_option tno(table_name); + if (table_name != "") { + options.push_back(&tno); + } + tokenizer<> tok(fields); output_fields_option fs; std::copy(tok.begin(), tok.end(), std::back_inserter(fs.fields)); @@ -100,12 +110,6 @@ public: return EXIT_SUCCESS; } - -private: - string input_file, output_file; - sample_format_type output_format; - - string table_name; }; } diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h index 4662bab..f8cfbe5 100644 --- a/sensor/include/trygvis/sensor.h +++ b/sensor/include/trygvis/sensor.h @@ -148,6 +148,9 @@ public: : dict(dict), values(values) { } + SampleRecord(const SampleRecord ©) : dict(copy.dict), values(copy.values) { + } + inline vec::const_iterator cbegin() const { return values.cbegin(); diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index f92b800..f86f2d9 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -12,29 +12,37 @@ namespace io { using namespace std; using namespace boost::asio; -class sample_output_stream_option { -public: +struct sample_output_stream_option { virtual ~sample_output_stream_option() { }; }; -class output_fields_option : public sample_output_stream_option { -public: +struct output_fields_option : sample_output_stream_option { ~output_fields_option() { } vector<string> fields; }; -class timestamp_field_option : public sample_output_stream_option { -public: - timestamp_field_option(string name) : name(name) { +struct timestamp_field_option : sample_output_stream_option { + timestamp_field_option(const string name) : name(name) { } ~timestamp_field_option() { } - string name; + const string name; +}; + +class table_name_option : public sample_output_stream_option { +public: + table_name_option(const string name) : name(name) { + } + + ~table_name_option() { + } + + const string name; }; class sample_output_stream_options : public vector<sample_output_stream_option *> { @@ -56,6 +64,44 @@ public: } }; +struct missing_required_option_error : runtime_error { + missing_required_option_error(string what) : runtime_error(what) { + } + + ~missing_required_option_error() { + } +}; + +class SampleStreamParser; + +class SampleOutputStream; + +/** + * Throws missing_required_option_error + */ +unique_ptr<SampleStreamParser> open_sample_stream_parser( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type = sample_format_type::AUTO); + +/** + * Throws missing_required_option_error + */ +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, + sample_output_stream_options options); + +static inline +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type) { + sample_output_stream_options options; + return open_sample_output_stream(output, dict, type, options); +} + class SampleOutputStream { public: virtual void write(SampleRecord const &sample) = 0; @@ -104,10 +150,6 @@ public: void write(SampleRecord const &sample); - const KeyDictionary &getDict() { - return dict; - } - private: void writeHeader(); @@ -212,26 +254,6 @@ private: unique_ptr<KeyValueSampleStreamParser> keyValueParser; }; -unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type = sample_format_type::AUTO); - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - sample_output_stream_options options); - -static inline -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type) { - sample_output_stream_options options; - return open_sample_output_stream(output, dict, type, options); -} - static inline unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index e73b9d4..255b4f5 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -1,7 +1,6 @@ #include "trygvis/sensor/io.h" #include <map> -#include <chrono> #include "json.hpp" #include "boost/tokenizer.hpp" #include <boost/algorithm/string.hpp> @@ -16,6 +15,52 @@ using boost::tokenizer; using boost::escaped_list_separator; using json = nlohmann::json; +unique_ptr<SampleStreamParser> open_sample_stream_parser( + shared_ptr<SampleOutputStream> output, + KeyDictionary &dict, + sample_format_type type) { + if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleStreamParser>(output, dict); + } else if (type == sample_format_type::AUTO) { + return make_unique<AutoSampleParser>(output, dict); + } else { + throw sample_exception("No parser for format type: " + to_string(type)); + } +} + +unique_ptr<SampleOutputStream> open_sample_output_stream( + shared_ptr<ostream> output, + KeyDictionary &dict, + sample_format_type type, + sample_output_stream_options options) { + + if (type == sample_format_type::CSV) { + return make_unique<CsvSampleOutputStream>(output, dict); + } else if (type == sample_format_type::KEY_VALUE) { + return make_unique<KeyValueSampleOutputStream>(output, dict); + } else if (type == sample_format_type::JSON) { + return make_unique<JsonSampleOutputStream>(output, dict); + } else if (type == sample_format_type::RRD) { + auto of = options.find_option<output_fields_option>(); + + auto tsf = options.find_option<timestamp_field_option>(); + + auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); + + return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); + } else if (type == sample_format_type::SQL) { + auto tno = options.find_option<table_name_option>(); + + if (!tno.is_initialized()) { + throw missing_required_option_error("table name"); + } + + return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name); + } else { + throw sample_exception("No writer for format type: " + to_string(type)); + } +} + ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) { } @@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<Sample } void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { + if (sample.at(timestamp_key)) { + underlying_->write(sample); + return; + } + auto time_since_epoch = system_clock::now().time_since_epoch(); auto timestamp = duration_cast<seconds>(time_since_epoch).count(); auto timestamp_s = std::to_string(timestamp); @@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDict dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &values) { - throw sample_exception("deimplemented"); - -// string fs, vs; -// -// fs.reserve(1024); -// vs.reserve(1024); -// -// if (filter_fields) { -// auto i = fields.begin(); -// -// while (i != fields.end()) { -// auto field = *i; -// -// fs += field; -// -// auto value = values.find(field); -// -// if (value != values.end()) { -// vs += "'" + value->second + "'"; -// } else { -// vs += "NULL"; -// } -// -// i++; -// -// if (i != fields.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } else { -// auto i = values.begin(); -// while (i != values.end()) { -// auto v = *i++; -// -// fs += v.first; -// vs += "'" + v.second + "'"; -// -// if (i != values.end()) { -// fs += ","; -// vs += ","; -// } -// } -// } -// -// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; +void SqlSampleOutputStream::write(SampleRecord const &sample) { + string fs, vs; + + fs.reserve(1024); + vs.reserve(1024); + + auto &s = *stream.get(); + + bool first = true; + if (!dict.empty()) { + for (auto &key: dict) { + auto sample_key = sample.dict.indexOf(key->name); + + auto value = sample.at(sample_key); + + if (value) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + fs += key->name; + vs += value.get(); + } + } + } else { + for (auto &sample_key: sample.dict) { + auto o = sample.at(sample_key); + + if (o) { + if (first) { + first = false; + } else { + fs += ", "; + vs += ", "; + } + // Make sure that the key is registered in the dictionary + dict.indexOf(sample_key->name); + + fs += sample_key->name; + vs += o.get(); + } + } + } + + s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush; } void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) { @@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) { } } -unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, - KeyDictionary &dict, - sample_format_type type) { - if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleStreamParser>(output, dict); - } else if (type == sample_format_type::AUTO) { - return make_unique<AutoSampleParser>(output, dict); - } else { - throw sample_exception("No parser for format type: " + to_string(type)); - } -} - -unique_ptr<SampleOutputStream> open_sample_output_stream( - shared_ptr<ostream> output, - KeyDictionary &dict, - sample_format_type type, - sample_output_stream_options options) { - - if (type == sample_format_type::CSV) { - return make_unique<CsvSampleOutputStream>(output, dict); - } else if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleOutputStream>(output, dict); - } else if (type == sample_format_type::JSON) { - return make_unique<JsonSampleOutputStream>(output, dict); - } else if (type == sample_format_type::RRD) { - o<output_fields_option *> of = options.find_option<output_fields_option>(); - - o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>(); - - auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp"); - - return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); -// } else if (type == sample_format_type::SQL) { -// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name); - } else { - throw sample_exception("No writer for format type: " + to_string(type)); - } -} - } } }
\ No newline at end of file |