aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.cpp')
-rw-r--r--apps/SoilMoistureIo.cpp272
1 files changed, 157 insertions, 115 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 315f48d..b8a8b64 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -11,19 +11,51 @@ namespace soil_moisture {
using namespace std;
using json = nlohmann::json;
-void VectorSampleOutputStream::write(Sample sample) {
+void VectorSampleOutputStream::write(SampleRecord sample) {
samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream) :
- stream(move(stream)), filterFields(false), headerWritten(false) {
+vector<KeyDictionary::index_t> KeyDictionary::findIndexes(vector<SampleKey> keys) {
+ vector<KeyDictionary::index_t> indexes;
+
+ for (auto &key: keys) {
+ auto index = indexOf(key);
+ indexes.push_back(index);
+ }
+
+ return move(indexes);
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
+ dict(dict), stream(move(stream)), headerWritten(false) {
}
-CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
- stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) {
+CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fieldKeys)
+ :
+ dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) {
}
-void CsvSampleOutputStream::write(Sample values) {
+void CsvSampleOutputStream::write(SampleRecord values) {
+ // Skip empty records
+ if (values.empty()) {
+ return;
+ }
+
+ if (fields.empty()) {
+ KeyDictionary::index_t index = 0;
+ auto ptr = values.begin();
+ while (ptr != values.end()) {
+ auto o = *ptr;
+
+ if (o) {
+ fields.push_back(index);
+ }
+
+ ptr++;
+ index++;
+ }
+ }
+
if (!headerWritten) {
writeHeader();
headerWritten = true;
@@ -31,27 +63,17 @@ void CsvSampleOutputStream::write(Sample values) {
auto &s = *stream.get();
- if (filterFields) {
- auto i = fields.begin();
- while (i != fields.end()) {
- if (i != fields.begin()) {
- s << ",";
- }
-
- auto field = *i++;
- auto value = values.find(field);
-
- if (value != values.end()) {
- s << value->second;
- }
+ auto i = fields.begin();
+ while (i != fields.end()) {
+ if (i != fields.begin()) {
+ s << ",";
}
- } else {
- for (auto i = values.begin(); i != values.end();) {
- s << "\"" << (*i).second << "\"";
- if (++i != values.end()) {
- s << ",";
- }
+ auto index = *i++;
+ auto o = values.at(index);
+
+ if (o) {
+ s << o.get();
}
}
@@ -59,15 +81,11 @@ void CsvSampleOutputStream::write(Sample values) {
}
void CsvSampleOutputStream::writeHeader() {
- if (fields.size() == 0) {
- return;
- }
-
auto &s = *stream.get();
auto i = fields.begin();
while (i != fields.end()) {
- s << *i;
+ s << dict.nameOf(*i);
i++;
@@ -79,95 +97,112 @@ void CsvSampleOutputStream::writeHeader() {
s << endl;
}
-JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream) :
- stream(move(stream)), fields(), filterFields(false) {
+JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
+ dict(dict), stream(move(stream)), filterFields(false) {
}
-JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
- stream(move(stream)), fields(fields), filterFields(true) {
+JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fields)
+ :
+ dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) {
}
-void JsonSampleOutputStream::write(Sample values) {
- json doc({});
+void JsonSampleOutputStream::write(SampleRecord values) {
+ throw sample_exception("deimplemented");
- if (filterFields) {
- for (auto &f: fields) {
- auto value = values.find(f);
+ json doc({});
- if (value != values.end()) {
- doc[f] = value->second;
- }
- }
- } else {
- for (auto &v: values) {
- doc[v.first] = v.second;
- }
- }
+// if (filterFields) {
+// for (auto &f: fields) {
+// auto value = values.find(f);
+//
+// if (value != values.end()) {
+// doc[f] = value->second;
+// }
+// }
+// } else {
+// for (auto &v: values) {
+// doc[v.first] = v.second;
+// }
+// }
*stream.get() << doc << endl;
}
-SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name) :
- stream(move(stream)), table_name(table_name), filter_fields(false) {
+SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name) :
+ dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) {
}
-SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields) :
- stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) {
+SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name, vector<SampleKey> fields)
+ :
+ dict(dict),
+ stream(move(stream)),
+ table_name(table_name),
+ fields(dict.findIndexes(fields)),
+ filter_fields(true) {
}
-void SqlSampleOutputStream::write(Sample values) {
- string fs, vs;
-
- fs.reserve(1024);
- vs.reserve(1024);
-
- if (filter_fields) {
- auto i = fields.begin();
-
- while (i != fields.end()) {
- auto field = *i;
-
- fs += field;
-
- auto value = values.find(field);
-
- if (value != values.end()) {
- vs += "'" + value->second + "'";
- } else {
- vs += "NULL";
- }
-
- i++;
+void SqlSampleOutputStream::write(SampleRecord values) {
+ throw sample_exception("deimplemented");
+
+// string fs, vs;
+//
+// fs.reserve(1024);
+// vs.reserve(1024);
+//
+// if (filter_fields) {
+// auto i = fields.begin();
+//
+// while (i != fields.end()) {
+// auto field = *i;
+//
+// fs += field;
+//
+// auto value = values.find(field);
+//
+// if (value != values.end()) {
+// vs += "'" + value->second + "'";
+// } else {
+// vs += "NULL";
+// }
+//
+// i++;
+//
+// if (i != fields.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// } else {
+// auto i = values.begin();
+// while (i != values.end()) {
+// auto v = *i++;
+//
+// fs += v.first;
+// vs += "'" + v.second + "'";
+//
+// if (i != values.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// }
+//
+// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+}
- if (i != fields.end()) {
- fs += ",";
- vs += ",";
- }
- }
- } else {
- auto i = values.begin();
- while (i != values.end()) {
- auto v = *i++;
+void CsvSampleParser::process(mutable_buffers_1 buffer) {
- fs += v.first;
- vs += "'" + v.second + "'";
+ size_t size = buffer_size(buffer);
- if (i != values.end()) {
- fs += ",";
- vs += ",";
- }
- }
+ if (size == 0 && line->size()) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ return;
}
- (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
-}
-
-void CsvSampleParser::process(mutable_buffers_1 buffer) {
-
- size_t some = buffer_size(buffer);
auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
- for (int i = 0; i < some; i++) {
+ for (int i = 0; i < size; i++) {
uint8_t b = data[i];
if (b == packet_delimiter) {
@@ -191,7 +226,7 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
boost::match_results<std::string::const_iterator> what;
boost::match_flag_type flags = boost::match_default;
- Sample sample;
+ SampleRecord sample(dict);
while (regex_search(start, end, what, e, flags)) {
auto key = static_cast<string>(what[1]);
@@ -201,19 +236,18 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
map<string, string> values;
values[key] = value;
- sample.set(key, value);
+ auto index = dict.indexOf(key);
+ sample.set(index, value);
flags |= boost::match_prev_avail;
flags |= boost::match_not_bob;
}
- if (sample.begin() != sample.end()) {
- output->write(sample);
- }
+ output->write(sample);
}
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output) :
- SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) {
+AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr<SampleOutputStream> output) :
+ SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) {
// Directly select the parser now until we have more than one parser
parser = std::move(csvParser);
type_ = sample_format_type::CSV;
@@ -240,34 +274,42 @@ string to_string(const sample_format_type &arg) {
throw std::runtime_error("Unknown format value: " + to_string(arg));
}
-unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type) {
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ KeyDictionary &dict,
+ shared_ptr<SampleOutputStream> output,
+ sample_format_type type) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleParser>(output);
+ return make_unique<CsvSampleParser>(dict, output);
} else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(output);
+ return make_unique<AutoSampleParser>(dict, output);
} else {
throw sample_exception("Unsupported format type: " + to_string(type));
}
}
-unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, boost::optional<vector<string>> fields) {
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ KeyDictionary &dict,
+ sample_format_type type,
+ unique_ptr<ostream> output,
+ o<vector<SampleKey>> fields) {
+
if (type == sample_format_type::CSV) {
if (fields) {
- return make_unique<CsvSampleOutputStream>(move(output), fields.get());
+ return make_unique<CsvSampleOutputStream>(dict, move(output), fields.get());
} else {
- return make_unique<CsvSampleOutputStream>(move(output));
+ return make_unique<CsvSampleOutputStream>(dict, move(output));
}
} else if (type == sample_format_type::JSON) {
if (fields) {
- return make_unique<JsonSampleOutputStream>(move(output), fields.get());
+ return make_unique<JsonSampleOutputStream>(dict, move(output), fields.get());
} else {
- return make_unique<JsonSampleOutputStream>(move(output));
+ return make_unique<JsonSampleOutputStream>(dict, move(output));
}
// } else if (type == sample_format_type::SQL) {
// if (fields) {
-// return make_unique<SqlSampleOutputStream>(move(output), table_name, fields.get());
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name, fields.get());
// } else {
-// return make_unique<SqlSampleOutputStream>(move(output), table_name);
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
// }
} else {
throw sample_exception("Unsupported format type: " + to_string(type));