aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.cpp')
-rw-r--r--apps/SoilMoistureIo.cpp157
1 files changed, 64 insertions, 93 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index b8a8b64..ad8a3bb 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -15,40 +15,26 @@ void VectorSampleOutputStream::write(SampleRecord sample) {
samples.emplace_back(sample);
}
-vector<KeyDictionary::index_t> KeyDictionary::findIndexes(vector<SampleKey> keys) {
- vector<KeyDictionary::index_t> indexes;
-
- for (auto &key: keys) {
- auto index = indexOf(key);
- indexes.push_back(index);
- }
-
- return move(indexes);
-}
-
-CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
- dict(dict), stream(move(stream)), headerWritten(false) {
+CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, KeyDictionary &dict)
+ : stream(move(stream)), headerWritten(false), dict(dict) {
}
-CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fieldKeys)
- :
- dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) {
-}
-
-void CsvSampleOutputStream::write(SampleRecord values) {
+void CsvSampleOutputStream::write(SampleRecord sample) {
// Skip empty records
- if (values.empty()) {
+ if (sample.empty()) {
return;
}
- if (fields.empty()) {
- KeyDictionary::index_t index = 0;
- auto ptr = values.begin();
- while (ptr != values.end()) {
+ // Build the dict with the keys from the first sample.
+ if (dict.empty()) {
+ SampleKeyIndex index = 0;
+ auto ptr = sample.begin();
+ while (ptr != sample.end()) {
auto o = *ptr;
if (o) {
- fields.push_back(index);
+ auto name = sample.dict.at(index)->name;
+ dict.indexOf(name);
}
ptr++;
@@ -63,14 +49,15 @@ void CsvSampleOutputStream::write(SampleRecord values) {
auto &s = *stream.get();
- auto i = fields.begin();
- while (i != fields.end()) {
- if (i != fields.begin()) {
+ auto it = dict.begin();
+ while (it != dict.end()) {
+ if (it != dict.begin()) {
s << ",";
}
- auto index = *i++;
- auto o = values.at(index);
+ auto key = *it++;
+ auto sampleKey = sample.dict.indexOf(key->name);
+ auto o = sample.at(sampleKey);
if (o) {
s << o.get();
@@ -83,13 +70,13 @@ void CsvSampleOutputStream::write(SampleRecord values) {
void CsvSampleOutputStream::writeHeader() {
auto &s = *stream.get();
- auto i = fields.begin();
- while (i != fields.end()) {
- s << dict.nameOf(*i);
+ auto i = dict.begin();
+ while (i != dict.end()) {
+ s << (*i)->name;
i++;
- if (i != fields.end()) {
+ if (i != dict.end()) {
s << ",";
}
}
@@ -97,48 +84,45 @@ void CsvSampleOutputStream::writeHeader() {
s << endl;
}
-JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
- dict(dict), stream(move(stream)), filterFields(false) {
+JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, KeyDictionary &dict) :
+ dict(dict), stream(move(stream)) {
}
-JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fields)
- :
- dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) {
-}
-
-void JsonSampleOutputStream::write(SampleRecord values) {
- throw sample_exception("deimplemented");
+void JsonSampleOutputStream::write(SampleRecord sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
json doc({});
-// if (filterFields) {
-// for (auto &f: fields) {
-// auto value = values.find(f);
-//
-// if (value != values.end()) {
-// doc[f] = value->second;
-// }
-// }
-// } else {
-// for (auto &v: values) {
-// doc[v.first] = v.second;
-// }
-// }
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sampleKey = sample.dict.indexOf(key->name);
- *stream.get() << doc << endl;
-}
+ auto value = sample.at(sampleKey);
+
+ if (value) {
+ doc[key->name] = value.get();
+ }
+ }
+ } else {
+ for (auto &sampleKey: sample.dict) {
+ auto o = sample.at(sampleKey);
+
+ if (o) {
+ // Make sure that the key is registered in the dictionary
+ dict.indexOf(sampleKey->name);
+ doc[sampleKey->name] = o.get();
+ }
+ }
+ }
-SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name) :
- dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) {
+ *stream.get() << doc << endl;
}
-SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name, vector<SampleKey> fields)
- :
- dict(dict),
- stream(move(stream)),
- table_name(table_name),
- fields(dict.findIndexes(fields)),
- filter_fields(true) {
+SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+ dict(dict), stream(move(stream)), table_name(table_name) {
}
void SqlSampleOutputStream::write(SampleRecord values) {
@@ -229,15 +213,15 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
SampleRecord sample(dict);
while (regex_search(start, end, what, e, flags)) {
- auto key = static_cast<string>(what[1]);
+ auto name = static_cast<string>(what[1]);
auto value = static_cast<string>(what[2]);
start = what[0].second;
map<string, string> values;
- values[key] = value;
+ values[name] = value;
- auto index = dict.indexOf(key);
- sample.set(index, value);
+ auto key = dict.indexOf(name);
+ sample.set(key, value);
flags |= boost::match_prev_avail;
flags |= boost::match_not_bob;
@@ -246,8 +230,8 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
output->write(sample);
}
-AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr<SampleOutputStream> output) :
- SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) {
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(csvParser);
type_ = sample_format_type::CSV;
@@ -279,38 +263,25 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
shared_ptr<SampleOutputStream> output,
sample_format_type type) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleParser>(dict, output);
+ return make_unique<CsvSampleParser>(output, dict);
} else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(dict, output);
+ return make_unique<AutoSampleParser>(output, dict);
} else {
throw sample_exception("Unsupported format type: " + to_string(type));
}
}
unique_ptr<SampleOutputStream> open_sample_output_stream(
- KeyDictionary &dict,
- sample_format_type type,
unique_ptr<ostream> output,
- o<vector<SampleKey>> fields) {
+ KeyDictionary &dict,
+ sample_format_type type) {
if (type == sample_format_type::CSV) {
- if (fields) {
- return make_unique<CsvSampleOutputStream>(dict, move(output), fields.get());
- } else {
- return make_unique<CsvSampleOutputStream>(dict, move(output));
- }
+ return make_unique<CsvSampleOutputStream>(move(output), dict);
} else if (type == sample_format_type::JSON) {
- if (fields) {
- return make_unique<JsonSampleOutputStream>(dict, move(output), fields.get());
- } else {
- return make_unique<JsonSampleOutputStream>(dict, move(output));
- }
+ return make_unique<JsonSampleOutputStream>(move(output), dict);
// } else if (type == sample_format_type::SQL) {
-// if (fields) {
-// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name, fields.get());
-// } else {
-// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
-// }
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
} else {
throw sample_exception("Unsupported format type: " + to_string(type));
}