From 2a7ffd694cfa3493ef1b83a69878322b8ca97670 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 14 Mar 2015 23:10:13 +0100 Subject: o Updating to new API. --- apps/SoilMoistureIo.cpp | 272 ++++++++++++++++++++++++++-------------------- apps/SoilMoistureIo.h | 197 ++++++++++++++++++++++++--------- apps/sample-convert.cpp | 9 +- apps/sample-timestamp.cpp | 59 +++++----- apps/sm-serial-read.cpp | 10 +- 5 files changed, 347 insertions(+), 200 deletions(-) diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp index 315f48d..b8a8b64 100644 --- a/apps/SoilMoistureIo.cpp +++ b/apps/SoilMoistureIo.cpp @@ -11,19 +11,51 @@ namespace soil_moisture { using namespace std; using json = nlohmann::json; -void VectorSampleOutputStream::write(Sample sample) { +void VectorSampleOutputStream::write(SampleRecord sample) { samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream) : - stream(move(stream)), filterFields(false), headerWritten(false) { +vector KeyDictionary::findIndexes(vector keys) { + vector indexes; + + for (auto &key: keys) { + auto index = indexOf(key); + indexes.push_back(index); + } + + return move(indexes); +} + +CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : + dict(dict), stream(move(stream)), headerWritten(false) { } -CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream, vector fields) : - stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) { +CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fieldKeys) + : + dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) { } -void CsvSampleOutputStream::write(Sample values) { +void CsvSampleOutputStream::write(SampleRecord values) { + // Skip empty records + if (values.empty()) { + return; + } + + if (fields.empty()) { + KeyDictionary::index_t index = 0; + auto ptr = values.begin(); + while (ptr != values.end()) { + auto o = *ptr; + + if (o) { + fields.push_back(index); + } + + ptr++; + index++; + } + } + if (!headerWritten) { writeHeader(); headerWritten = true; @@ -31,27 +63,17 @@ void CsvSampleOutputStream::write(Sample values) { auto &s = *stream.get(); - if (filterFields) { - auto i = fields.begin(); - while (i != fields.end()) { - if (i != fields.begin()) { - s << ","; - } - - auto field = *i++; - auto value = values.find(field); - - if (value != values.end()) { - s << value->second; - } + auto i = fields.begin(); + while (i != fields.end()) { + if (i != fields.begin()) { + s << ","; } - } else { - for (auto i = values.begin(); i != values.end();) { - s << "\"" << (*i).second << "\""; - if (++i != values.end()) { - s << ","; - } + auto index = *i++; + auto o = values.at(index); + + if (o) { + s << o.get(); } } @@ -59,15 +81,11 @@ void CsvSampleOutputStream::write(Sample values) { } void CsvSampleOutputStream::writeHeader() { - if (fields.size() == 0) { - return; - } - auto &s = *stream.get(); auto i = fields.begin(); while (i != fields.end()) { - s << *i; + s << dict.nameOf(*i); i++; @@ -79,95 +97,112 @@ void CsvSampleOutputStream::writeHeader() { s << endl; } -JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream) : - stream(move(stream)), fields(), filterFields(false) { +JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : + dict(dict), stream(move(stream)), filterFields(false) { } -JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream, vector fields) : - stream(move(stream)), fields(fields), filterFields(true) { +JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fields) + : + dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) { } -void JsonSampleOutputStream::write(Sample values) { - json doc({}); +void JsonSampleOutputStream::write(SampleRecord values) { + throw sample_exception("deimplemented"); - if (filterFields) { - for (auto &f: fields) { - auto value = values.find(f); + json doc({}); - if (value != values.end()) { - doc[f] = value->second; - } - } - } else { - for (auto &v: values) { - doc[v.first] = v.second; - } - } +// if (filterFields) { +// for (auto &f: fields) { +// auto value = values.find(f); +// +// if (value != values.end()) { +// doc[f] = value->second; +// } +// } +// } else { +// for (auto &v: values) { +// doc[v.first] = v.second; +// } +// } *stream.get() << doc << endl; } -SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name) : - stream(move(stream)), table_name(table_name), filter_fields(false) { +SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name) : + dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) { } -SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name, vector fields) : - stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) { +SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name, vector fields) + : + dict(dict), + stream(move(stream)), + table_name(table_name), + fields(dict.findIndexes(fields)), + filter_fields(true) { } -void SqlSampleOutputStream::write(Sample values) { - string fs, vs; - - fs.reserve(1024); - vs.reserve(1024); - - if (filter_fields) { - auto i = fields.begin(); - - while (i != fields.end()) { - auto field = *i; - - fs += field; - - auto value = values.find(field); - - if (value != values.end()) { - vs += "'" + value->second + "'"; - } else { - vs += "NULL"; - } - - i++; +void SqlSampleOutputStream::write(SampleRecord values) { + throw sample_exception("deimplemented"); + +// string fs, vs; +// +// fs.reserve(1024); +// vs.reserve(1024); +// +// if (filter_fields) { +// auto i = fields.begin(); +// +// while (i != fields.end()) { +// auto field = *i; +// +// fs += field; +// +// auto value = values.find(field); +// +// if (value != values.end()) { +// vs += "'" + value->second + "'"; +// } else { +// vs += "NULL"; +// } +// +// i++; +// +// if (i != fields.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } else { +// auto i = values.begin(); +// while (i != values.end()) { +// auto v = *i++; +// +// fs += v.first; +// vs += "'" + v.second + "'"; +// +// if (i != values.end()) { +// fs += ","; +// vs += ","; +// } +// } +// } +// +// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; +} - if (i != fields.end()) { - fs += ","; - vs += ","; - } - } - } else { - auto i = values.begin(); - while (i != values.end()) { - auto v = *i++; +void CsvSampleParser::process(mutable_buffers_1 buffer) { - fs += v.first; - vs += "'" + v.second + "'"; + size_t size = buffer_size(buffer); - if (i != values.end()) { - fs += ","; - vs += ","; - } - } + if (size == 0 && line->size()) { + process_line(line); + line = make_shared>(); + return; } - (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; -} - -void CsvSampleParser::process(mutable_buffers_1 buffer) { - - size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast(buffer); - for (int i = 0; i < some; i++) { + for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { @@ -191,7 +226,7 @@ void CsvSampleParser::process_line(shared_ptr> packet) { boost::match_results what; boost::match_flag_type flags = boost::match_default; - Sample sample; + SampleRecord sample(dict); while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); @@ -201,19 +236,18 @@ void CsvSampleParser::process_line(shared_ptr> packet) { map values; values[key] = value; - sample.set(key, value); + auto index = dict.indexOf(key); + sample.set(index, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } - if (sample.begin() != sample.end()) { - output->write(sample); - } + output->write(sample); } -AutoSampleParser::AutoSampleParser(shared_ptr output) : - SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) { +AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr output) : + SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) { // Directly select the parser now until we have more than one parser parser = std::move(csvParser); type_ = sample_format_type::CSV; @@ -240,34 +274,42 @@ string to_string(const sample_format_type &arg) { throw std::runtime_error("Unknown format value: " + to_string(arg)); } -unique_ptr open_sample_input_stream(shared_ptr output, sample_format_type type) { +unique_ptr open_sample_input_stream( + KeyDictionary &dict, + shared_ptr output, + sample_format_type type) { if (type == sample_format_type::CSV) { - return make_unique(output); + return make_unique(dict, output); } else if (type == sample_format_type::AUTO) { - return make_unique(output); + return make_unique(dict, output); } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } -unique_ptr open_sample_output_stream(sample_format_type type, unique_ptr output, boost::optional> fields) { +unique_ptr open_sample_output_stream( + KeyDictionary &dict, + sample_format_type type, + unique_ptr output, + o> fields) { + if (type == sample_format_type::CSV) { if (fields) { - return make_unique(move(output), fields.get()); + return make_unique(dict, move(output), fields.get()); } else { - return make_unique(move(output)); + return make_unique(dict, move(output)); } } else if (type == sample_format_type::JSON) { if (fields) { - return make_unique(move(output), fields.get()); + return make_unique(dict, move(output), fields.get()); } else { - return make_unique(move(output)); + return make_unique(dict, move(output)); } // } else if (type == sample_format_type::SQL) { // if (fields) { -// return make_unique(move(output), table_name, fields.get()); +// return make_unique(dict, move(output), table_name, fields.get()); // } else { -// return make_unique(move(output), table_name); +// return make_unique(dict, move(output), table_name); // } } else { throw sample_exception("Unsupported format type: " + to_string(type)); diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h index b8f08e9..b8f0b52 100644 --- a/apps/SoilMoistureIo.h +++ b/apps/SoilMoistureIo.h @@ -33,121 +33,216 @@ class SampleStreamParser; class SampleOutputStream; -unique_ptr open_sample_input_stream(shared_ptr output, sample_format_type type = sample_format_type::AUTO); +class KeyDictionary; -unique_ptr open_sample_output_stream(sample_format_type type, unique_ptr output, - o> fields = o>()); +class SampleKey; -class Sample { +unique_ptr open_sample_input_stream( + KeyDictionary &dict, + shared_ptr output, + sample_format_type type = sample_format_type::AUTO); + +unique_ptr open_sample_output_stream( + KeyDictionary &dict, + sample_format_type type, + unique_ptr output, + o> fields = o>()); + +class sample_exception : public runtime_error { public: - Sample() : entries() { + sample_exception(const string &what) : runtime_error(what) { } +}; - Sample(map entries) : entries(entries) { +struct SampleKey { + // TODO: only the dictionary should be able to create keys + SampleKey(string &name) : name(name) { + if (name.length() == 0) { + throw sample_exception("Bad sample key."); + } } - map::iterator find(string &s) { - return entries.find(s); + inline + bool operator==(const SampleKey &that) const { + return name == that.name; } - map::iterator begin() { - return entries.begin(); - } + string name; +}; + +class KeyDictionary { +public: + typedef vector v; + typedef v::size_type index_t; - map::iterator end() { - return entries.end(); + KeyDictionary() { } - /** - * @throws std::out_of_range - */ - inline const string &operator[](string key) { - return at(key); + index_t indexOf(const SampleKey key) { + index_t i = 0; + for (auto ptr = keys.begin(); ptr != keys.end(); ptr++, i++) { + if (*ptr == key) { + return i; + } + } + + keys.push_back(key); + + return keys.size() - 1; } - /** - * @throws std::out_of_range - */ - const string &at(string key) { - return entries.at(key); + vector findIndexes(v keys); + + inline + v::const_iterator begin() { + return keys.begin(); } - template - const A lexical_at(string key) { - return boost::lexical_cast(entries.at(key)); + inline + v::const_iterator end() { + return keys.end(); } - void set(const std::string &key, const std::string &value) { - entries[key] = value; + string nameOf(index_t index) { + return keys.at(index).name; } private: - map entries; + v keys; }; -class sample_exception : public runtime_error { +class SampleRecord { public: - sample_exception(const string &what) : runtime_error(what) { + typedef vector> vec; + + SampleRecord(KeyDictionary &dict) : dict(dict) { + } + + SampleRecord(KeyDictionary &dict, vec values) + : dict(dict), values(values) { + } + + inline + vec::const_iterator begin() { + return values.begin(); + } + + inline + vec::const_iterator end() { + return values.end(); } + + inline + bool empty() { + return values.empty(); + } + + o at(size_t index) { + if (index >= values.size()) { + return o(); + } + + return values.at(index); + } + + void set(const KeyDictionary::index_t index, const std::string &value) { + values.resize(max(values.size(), index + 1)); + + values[index] = o(value); + } + + template + const o lexical_at(KeyDictionary::index_t index) { + auto value = at(index); + + if (!value) { + return o(); + } + + return o(boost::lexical_cast(value.get())); + } + + string to_string() { + KeyDictionary::index_t i = 0; + string s; + for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) { + auto o = *ptr; + + if (!o) { + continue; + } + + auto value = o.get(); + + s += dict.nameOf(i) + " = " + value + ", "; + } + return s; + } + +private: + KeyDictionary &dict; + vec values; }; class SampleOutputStream { public: - virtual void write(Sample sample) = 0; + virtual void write(SampleRecord sample) = 0; }; class VectorSampleOutputStream : public SampleOutputStream { public: - virtual void write(Sample sample) override; + virtual void write(SampleRecord sample) override; public: - vector samples; + vector samples; }; class CsvSampleOutputStream : public SampleOutputStream { public: - CsvSampleOutputStream(unique_ptr stream); + CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream); - CsvSampleOutputStream(unique_ptr stream, vector fields); + CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fields); - void write(Sample values); + void write(SampleRecord values); private: void writeHeader(); + KeyDictionary &dict; unique_ptr stream; bool headerWritten; - bool filterFields; - vector fields; + vector fields; }; class JsonSampleOutputStream : public SampleOutputStream { public: - JsonSampleOutputStream(unique_ptr stream); + JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream); - JsonSampleOutputStream(unique_ptr stream, vector fields); + JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fields); - void write(Sample values); + void write(SampleRecord values); private: + KeyDictionary &dict; unique_ptr stream; bool filterFields; - vector fields; + vector fields; }; class SqlSampleOutputStream : public SampleOutputStream { public: - SqlSampleOutputStream(unique_ptr stream, string table_name); + SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name); - SqlSampleOutputStream(unique_ptr stream, string table_name, vector fields); + SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name, vector fields); - void write(Sample values); + void write(SampleRecord values); private: + KeyDictionary &dict; unique_ptr stream; bool filter_fields; - vector fields; + vector fields; const string table_name; }; @@ -169,8 +264,9 @@ protected: class CsvSampleParser : public SampleStreamParser { public: - CsvSampleParser(shared_ptr output) : SampleStreamParser(sample_format_type::CSV), - output(output), line(make_shared>()) { + CsvSampleParser(KeyDictionary &dict, shared_ptr output) : + SampleStreamParser(sample_format_type::CSV), dict(dict), output(output), + line(make_shared>()) { } void process(mutable_buffers_1 buffer) override; @@ -179,13 +275,14 @@ private: void process_line(shared_ptr> packet); static const uint8_t packet_delimiter = '\n'; + KeyDictionary &dict; shared_ptr output; shared_ptr> line; }; class AutoSampleParser : public SampleStreamParser { public: - AutoSampleParser(shared_ptr output); + AutoSampleParser(KeyDictionary &dict, shared_ptr output); private: unique_ptr parser; diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index 249b737..b3e5c02 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -35,6 +35,7 @@ public: auto desc = execution.desc; auto vm = execution.vm; + KeyDictionary dict; shared_ptr output; istream *inputStream; @@ -60,22 +61,22 @@ public: } if (output_format == "plain") { - output = make_shared(move(outputStream)); + output = make_shared(dict, move(outputStream)); } else if (output_format == "json") { - output = make_shared(move(outputStream)); + output = make_shared(dict, move(outputStream)); } else if (output_format == "sql") { if (table_name.size() == 0) { cerr << "Missing option: table-name" << endl; return EXIT_FAILURE; } - output = make_shared(move(outputStream), table_name); + output = make_shared(dict, move(outputStream), table_name); } else { cerr << "Unsupported output format: " << output_format << endl; return EXIT_FAILURE; } - auto input = make_shared(output); + auto input = make_shared(dict, output); char data[100]; while (!inputStream->eof()) { diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp index 3a0b3e0..6ac2f86 100644 --- a/apps/sample-timestamp.cpp +++ b/apps/sample-timestamp.cpp @@ -14,21 +14,27 @@ namespace po = boost::program_options; class TimestampFixingSampleOutputStream : public SampleOutputStream { public: - TimestampFixingSampleOutputStream(string timestamp_name, string now_name, time_t start_time, shared_ptr output) : - timestamp_name_(timestamp_name), now_name_(now_name), start_time_(start_time), output_(output) { + TimestampFixingSampleOutputStream(KeyDictionary dict, string timestamp_name, string now_name, time_t start_time, shared_ptr output) : + timestamp_index(dict.indexOf(timestamp_name)), now_index(dict.indexOf(now_name)), start_time_(start_time), output_(output) { } - virtual void write(Sample sample) override { - long relative_time = sample.lexical_at(now_name_); + virtual void write(SampleRecord sample) override { + o relative_time_o = sample.lexical_at(now_index); + + if (!relative_time_o) { + return; + } + + long relative_time = relative_time_o.get(); string new_value = std::to_string(start_time_ + relative_time); - sample.set(timestamp_name_, new_value); + sample.set(timestamp_index, new_value); output_->write(sample); }; private: - string now_name_, timestamp_name_; + KeyDictionary::index_t now_index, timestamp_index; time_t start_time_; shared_ptr output_; }; @@ -37,6 +43,7 @@ class sample_timestamp : public app { private: string input_file, timestamp_name, now_name; + KeyDictionary::index_t now_index; public: sample_timestamp() : input_file("") { @@ -69,19 +76,18 @@ public: return EXIT_FAILURE; } + KeyDictionary dict; + + now_index = dict.indexOf(now_name); + auto sample_buffer = make_shared(); - unique_ptr parser = open_sample_input_stream(sample_buffer); - while (!input.eof()) { + unique_ptr parser = open_sample_input_stream(dict, sample_buffer); + while (!input.fail()) { char buffer[buffer_size]; input.read(buffer, buffer_size); + auto count = (size_t) input.gcount(); - if (input.bad()) { - cerr << "Error reading input" << endl; - return EXIT_FAILURE; - } - - size_t count = (size_t) input.gcount(); - + cerr << "eof? " << input.eof() << endl; mutable_buffers_1 b = boost::asio::buffer(buffer, count); parser->process(b); } @@ -93,21 +99,20 @@ public: time_t end_time = buf.st_mtim.tv_sec; - Sample sample = *--sample_buffer->samples.end(); + SampleRecord sample = *--sample_buffer->samples.end(); - string s; - try { - s = sample.at(now_name); - } catch (out_of_range &e) { + o s = sample.at(now_index); + if (!s) { cerr << "Missing key '" + now_name + "'." << endl; + cerr << "keys: " << sample.to_string() << endl; return EXIT_FAILURE; } long now; try { - now = boost::lexical_cast(s); + now = boost::lexical_cast(s.get()); } catch (const boost::bad_lexical_cast &e) { - cerr << "Bad integer value '" + s + "'." << endl; + cerr << "Bad integer value '" + s.get() + "'." << endl; return EXIT_FAILURE; } @@ -124,16 +129,16 @@ public: return EXIT_FAILURE; } - auto output_stream = open_sample_output_stream(parser->type(), unique_ptr(&cout)); - auto p = make_shared("timestamp", now_name, start_time, move(output_stream)); - parser = open_sample_input_stream(p, parser->type()); + auto output_stream = open_sample_output_stream(dict, parser->type(), unique_ptr(&cout)); + auto p = make_shared(dict, "timestamp", now_name, start_time, move(output_stream)); + parser = open_sample_input_stream(dict, p, parser->type()); int recordCount = 0; while (!input.eof()) { char buffer[buffer_size]; - - size_t gcount = (size_t)input.readsome(buffer, buffer_size); + input.read(buffer, buffer_size); + size_t gcount = (size_t)input.gcount(); recordCount++; diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp index 04a718d..c7fb695 100644 --- a/apps/sm-serial-read.cpp +++ b/apps/sm-serial-read.cpp @@ -101,6 +101,8 @@ public: auto desc = execution.desc; auto vm = execution.vm; + KeyDictionary dict; + uint32_t baud_rate = 115200; auto port_name = vm["port"].as(); @@ -124,17 +126,17 @@ public: unique_ptr outputStream = unique_ptr(&cout); if (format == Format::JSON) { - output = make_shared(std::move(outputStream)); + output = make_shared(dict, std::move(outputStream)); } else if (format == Format::SQL) { - output = make_shared(std::move(outputStream), "raw"); + output = make_shared(dict, std::move(outputStream), "raw"); } else if (format == Format::PLAIN) { - output = make_shared(std::move(outputStream)); + output = make_shared(dict, std::move(outputStream)); } else { cerr << "Unsupported format: " << boost::lexical_cast(format) << endl; return EXIT_FAILURE; } - shared_ptr input = make_shared(output); + shared_ptr input = make_shared(dict, output); port_handler(port_name, port, input).run(); -- cgit v1.2.3