aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.cpp')
-rw-r--r--apps/SoilMoistureIo.cpp122
1 files changed, 96 insertions, 26 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index fe485b3..315f48d 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -11,12 +11,16 @@ namespace soil_moisture {
using namespace std;
using json = nlohmann::json;
-CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream) :
- stream(stream), filterFields(false), headerWritten(false) {
+void VectorSampleOutputStream::write(Sample sample) {
+ samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector<string> fields) :
- stream(stream), fields(fields), filterFields(true), headerWritten(false) {
+CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream) :
+ stream(move(stream)), filterFields(false), headerWritten(false) {
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
+ stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) {
}
void CsvSampleOutputStream::write(Sample values) {
@@ -25,31 +29,33 @@ void CsvSampleOutputStream::write(Sample values) {
headerWritten = true;
}
+ auto &s = *stream.get();
+
if (filterFields) {
auto i = fields.begin();
while (i != fields.end()) {
if (i != fields.begin()) {
- stream << ",";
+ s << ",";
}
auto field = *i++;
auto value = values.find(field);
if (value != values.end()) {
- stream << value->second;
+ s << value->second;
}
}
} else {
for (auto i = values.begin(); i != values.end();) {
- stream << "\"" << (*i).second << "\"";
+ s << "\"" << (*i).second << "\"";
if (++i != values.end()) {
- stream << ",";
+ s << ",";
}
}
}
- stream << endl;
+ s << endl;
}
void CsvSampleOutputStream::writeHeader() {
@@ -57,26 +63,28 @@ void CsvSampleOutputStream::writeHeader() {
return;
}
+ auto &s = *stream.get();
+
auto i = fields.begin();
while (i != fields.end()) {
- stream << *i;
+ s << *i;
i++;
if (i != fields.end()) {
- stream << ",";
+ s << ",";
}
}
- stream << endl;
+ s << endl;
}
-JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream) :
- stream(stream), fields(), filterFields(false) {
+JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream) :
+ stream(move(stream)), fields(), filterFields(false) {
}
-JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector<string> fields) :
- stream(stream), fields(fields), filterFields(true) {
+JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
+ stream(move(stream)), fields(fields), filterFields(true) {
}
void JsonSampleOutputStream::write(Sample values) {
@@ -96,15 +104,15 @@ void JsonSampleOutputStream::write(Sample values) {
}
}
- stream << doc << endl;
+ *stream.get() << doc << endl;
}
-SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name) :
- stream(stream), table_name(table_name), filter_fields(false) {
+SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name) :
+ stream(move(stream)), table_name(table_name), filter_fields(false) {
}
-SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name, vector<string> fields) :
- stream(stream), table_name(table_name), fields(fields), filter_fields(true) {
+SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields) :
+ stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) {
}
void SqlSampleOutputStream::write(Sample values) {
@@ -126,7 +134,7 @@ void SqlSampleOutputStream::write(Sample values) {
if (value != values.end()) {
vs += "'" + value->second + "'";
} else {
- vs + "NULL";
+ vs += "NULL";
}
i++;
@@ -151,10 +159,10 @@ void SqlSampleOutputStream::write(Sample values) {
}
}
- stream << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+ (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
}
-void CsvParser::process(mutable_buffers_1 buffer) {
+void CsvSampleParser::process(mutable_buffers_1 buffer) {
size_t some = buffer_size(buffer);
auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
@@ -172,7 +180,7 @@ void CsvParser::process(mutable_buffers_1 buffer) {
}
-void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
auto s = std::string((char *) packet->data(), packet->size());
@@ -193,7 +201,7 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
map<string, string> values;
values[key] = value;
- sample[key] = value;
+ sample.set(key, value);
flags |= boost::match_prev_avail;
flags |= boost::match_not_bob;
@@ -204,5 +212,67 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
}
}
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output) :
+ SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) {
+ // Directly select the parser now until we have more than one parser
+ parser = std::move(csvParser);
+ type_ = sample_format_type::CSV;
+}
+
+void AutoSampleParser::process(mutable_buffers_1 buffer) {
+ if (parser) {
+ parser->process(buffer);
+ } else {
+ throw runtime_error("Not implemented yet");
+ }
+}
+
+string to_string(const sample_format_type &arg) {
+ if (arg == sample_format_type::AUTO)
+ return "auto";
+ else if (arg == sample_format_type::CSV)
+ return "csv";
+ else if (arg == sample_format_type::JSON)
+ return "json";
+ else if (arg == sample_format_type::SQL)
+ return "sql";
+ else
+ throw std::runtime_error("Unknown format value: " + to_string(arg));
+}
+
+unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type) {
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleParser>(output);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output);
+ } else {
+ throw sample_exception("Unsupported format type: " + to_string(type));
+ }
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, boost::optional<vector<string>> fields) {
+ if (type == sample_format_type::CSV) {
+ if (fields) {
+ return make_unique<CsvSampleOutputStream>(move(output), fields.get());
+ } else {
+ return make_unique<CsvSampleOutputStream>(move(output));
+ }
+ } else if (type == sample_format_type::JSON) {
+ if (fields) {
+ return make_unique<JsonSampleOutputStream>(move(output), fields.get());
+ } else {
+ return make_unique<JsonSampleOutputStream>(move(output));
+ }
+// } else if (type == sample_format_type::SQL) {
+// if (fields) {
+// return make_unique<SqlSampleOutputStream>(move(output), table_name, fields.get());
+// } else {
+// return make_unique<SqlSampleOutputStream>(move(output), table_name);
+// }
+ } else {
+ throw sample_exception("Unsupported format type: " + to_string(type));
+ }
+}
+
}
}