From 2a7ffd694cfa3493ef1b83a69878322b8ca97670 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 14 Mar 2015 23:10:13 +0100 Subject: o Updating to new API. --- apps/SoilMoistureIo.cpp | 272 ++++++++++++++++++++++++++++-------------------- 1 file changed, 157 insertions(+), 115 deletions(-) (limited to 'apps/SoilMoistureIo.cpp') diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp index 315f48d..b8a8b64 100644 --- a/apps/SoilMoistureIo.cpp +++ b/apps/SoilMoistureIo.cpp @@ -11,19 +11,51 @@ namespace soil_moisture { using namespace std; using json = nlohmann::json; -void VectorSampleOutputStream::write(Sample sample) { +void VectorSampleOutputStream::write(SampleRecord sample) { samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream) : - stream(move(stream)), filterFields(false), headerWritten(false) { +vector KeyDictionary::findIndexes(vector keys) { + vector indexes; + + for (auto &key: keys) { + auto index = indexOf(key); + indexes.push_back(index); + } + + return move(indexes); +} + +CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : + dict(dict), stream(move(stream)), headerWritten(false) { } -CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream, vector fields) : - stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) { +CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fieldKeys) + : + dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) { } -void CsvSampleOutputStream::write(Sample values) { +void CsvSampleOutputStream::write(SampleRecord values) { + // Skip empty records + if (values.empty()) { + return; + } + + if (fields.empty()) { + KeyDictionary::index_t index = 0; + auto ptr = values.begin(); + while (ptr != values.end()) { + auto o = *ptr; + + if (o) { + fields.push_back(index); + } + + ptr++; + index++; + } + } + if (!headerWritten) { writeHeader(); headerWritten = true; @@ -31,27 +63,17 @@ void CsvSampleOutputStream::write(Sample values) { auto &s = *stream.get(); - if (filterFields) { - auto i = fields.begin(); - while (i != fields.end()) { - if (i != fields.begin()) { - s << ","; - } - - auto field = *i++; - auto value = values.find(field); - - if (value != values.end()) { - s << value->second; - } + auto i = fields.begin(); + while (i != fields.end()) { + if (i != fields.begin()) { + s << ","; } - } else { - for (auto i = values.begin(); i != values.end();) { - s << "\"" << (*i).second << "\""; - if (++i != values.end()) { - s << ","; - } + auto index = *i++; + auto o = values.at(index); + + if (o) { + s << o.get(); } } @@ -59,15 +81,11 @@ void CsvSampleOutputStream::write(Sample values) { } void CsvSampleOutputStream::writeHeader() { - if (fields.size() == 0) { - return; - } - auto &s = *stream.get(); auto i = fields.begin(); while (i != fields.end()) { - s << *i; + s << dict.nameOf(*i); i++; @@ -79,95 +97,112 @@ void CsvSampleOutputStream::writeHeader() { s << endl; } -JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream) : - stream(move(stream)), fields(), filterFields(false) { +JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : + dict(dict), stream(move(stream)), filterFields(false) { } -JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream, vector fields) : - stream(move(stream)), fields(fields), filterFields(true) { +JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fields) + : + dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) { } -void JsonSampleOutputStream::write(Sample values) { - json doc({}); +void JsonSampleOutputStream::write(SampleRecord values) { + throw sample_exception("deimplemented"); - if (filterFields) { - for (auto &f: fields) { - auto value = values.find(f); + json doc({}); - if (value != values.end()) { - doc[f] = value->second; - } - } - } else { - for (auto &v: values) { - doc[v.first] = v.second; - } - } +// if (filterFields) { +// for (auto &f: fields) { +// auto value = values.find(f); +// +// if (value != values.end()) { +// doc[f] = value->second; +// } +// } +// } else { +// for (auto &v: values) { +// doc[v.first] = v.second; +// } +// } *stream.get() << doc << endl; } -SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name) : - stream(move(stream)), table_name(table_name), filter_fields(false) { +SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name) : + dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) { } -SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name, vector fields) : - stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) { +SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name, vector fields) + : + dict(dict), + stream(move(stream)), + table_name(table_name), + fields(dict.findIndexes(fields)), + filter_fields(true) { } -void SqlSampleOutputStream::write(Sample values) { - string fs, vs; - - fs.reserve(1024); - vs.reserve(1024); - - if (filter_fields) { - auto i = fields.begin(); - - while (i != fields.end()) { - auto field = *i; - - fs += field; - - auto value = values.find(field); - - if (value != values.end()) { - vs += "'" + value->second + "'"; - } else { - vs += "NULL"; - } - - i++; +void SqlSampleOutputStream::write(SampleRecord values) { + throw sample_exception("deimplemented"); + +// string fs, vs; +// +// fs.reserve(1024); +// vs.reserve(1024); +// +// if (filter_fields) { +// auto i = fields.begin(); +// +// while (i != fields.end()) { +// auto field = *i; +// +// fs += field; +// +// auto value = values.find(field); +// +// if (value != values.end()) { +// vs += "'" + value->second + "'"; +// } else { +// vs += "NULL"; +// } +// +// i++; +// +// if (i != fields.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } else { +// auto i = values.begin(); +// while (i != values.end()) { +// auto v = *i++; +// +// fs += v.first; +// vs += "'" + v.second + "'"; +// +// if (i != values.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } +// +// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; +} - if (i != fields.end()) { - fs += ","; - vs += ","; - } - } - } else { - auto i = values.begin(); - while (i != values.end()) { - auto v = *i++; +void CsvSampleParser::process(mutable_buffers_1 buffer) { - fs += v.first; - vs += "'" + v.second + "'"; + size_t size = buffer_size(buffer); - if (i != values.end()) { - fs += ","; - vs += ","; - } - } + if (size == 0 && line->size()) { + process_line(line); + line = make_shared>(); + return; } - (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; -} - -void CsvSampleParser::process(mutable_buffers_1 buffer) { - - size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast(buffer); - for (int i = 0; i < some; i++) { + for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { @@ -191,7 +226,7 @@ void CsvSampleParser::process_line(shared_ptr> packet) { boost::match_results what; boost::match_flag_type flags = boost::match_default; - Sample sample; + SampleRecord sample(dict); while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); @@ -201,19 +236,18 @@ void CsvSampleParser::process_line(shared_ptr> packet) { map values; values[key] = value; - sample.set(key, value); + auto index = dict.indexOf(key); + sample.set(index, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } - if (sample.begin() != sample.end()) { - output->write(sample); - } + output->write(sample); } -AutoSampleParser::AutoSampleParser(shared_ptr output) : - SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) { +AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr output) : + SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) { // Directly select the parser now until we have more than one parser parser = std::move(csvParser); type_ = sample_format_type::CSV; @@ -240,34 +274,42 @@ string to_string(const sample_format_type &arg) { throw std::runtime_error("Unknown format value: " + to_string(arg)); } -unique_ptr open_sample_input_stream(shared_ptr output, sample_format_type type) { +unique_ptr open_sample_input_stream( + KeyDictionary &dict, + shared_ptr output, + sample_format_type type) { if (type == sample_format_type::CSV) { - return make_unique(output); + return make_unique(dict, output); } else if (type == sample_format_type::AUTO) { - return make_unique(output); + return make_unique(dict, output); } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } -unique_ptr open_sample_output_stream(sample_format_type type, unique_ptr output, boost::optional> fields) { +unique_ptr open_sample_output_stream( + KeyDictionary &dict, + sample_format_type type, + unique_ptr output, + o> fields) { + if (type == sample_format_type::CSV) { if (fields) { - return make_unique(move(output), fields.get()); + return make_unique(dict, move(output), fields.get()); } else { - return make_unique(move(output)); + return make_unique(dict, move(output)); } } else if (type == sample_format_type::JSON) { if (fields) { - return make_unique(move(output), fields.get()); + return make_unique(dict, move(output), fields.get()); } else { - return make_unique(move(output)); + return make_unique(dict, move(output)); } // } else if (type == sample_format_type::SQL) { // if (fields) { -// return make_unique(move(output), table_name, fields.get()); +// return make_unique(dict, move(output), table_name, fields.get()); // } else { -// return make_unique(move(output), table_name); +// return make_unique(dict, move(output), table_name); // } } else { throw sample_exception("Unsupported format type: " + to_string(type)); -- cgit v1.2.3