aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.cpp')
-rw-r--r--apps/SoilMoistureIo.cpp82
1 files changed, 80 insertions, 2 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 66e1dce..4d2e03d 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -173,6 +173,58 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << endl;
}
+RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) :
+ stream(move(stream)), timestamp_key(timestamp_key) {
+ if (output_fields) {
+ for (auto field : output_fields.get()->fields) {
+ keys.emplace_back(dict.indexOf(field));
+ }
+ } else {
+ for (auto key : dict) {
+ keys.emplace_back(key);
+ }
+ }
+}
+
+void RrdSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ auto timestampO = sample.at(timestamp_key);
+
+ if (!timestampO) {
+ return;
+ }
+
+ auto timestamp = timestampO.get();
+
+ s << timestamp;
+
+ bool first = true;
+ for (auto &key: keys) {
+ if (key == timestamp_key) {
+ continue;
+ }
+
+ auto value = sample.at(key);
+
+ if (first) {
+ s << "@";
+ first = false;
+ } else {
+ s << ":";
+ }
+
+ s << (value ? value.get() : "U");
+ }
+
+ *stream.get() << endl;
+}
+
SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
dict(dict), stream(move(stream)), table_name(table_name) {
}
@@ -305,8 +357,10 @@ string to_string(const sample_format_type &arg) {
return "key-value";
else if (arg == sample_format_type::SQL)
return "sql";
+ else if (arg == sample_format_type::RRD)
+ return "rrd";
else
- throw std::runtime_error("Unknown format value: " + to_string(arg));
+ return "unknown";
}
std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
@@ -327,6 +381,8 @@ std::istream& operator>>(std::istream& is, sample_format_type& type) {
type = sample_format_type::JSON;
} else if (s == "sql") {
type = sample_format_type::SQL;
+ } else if (s == "rrd") {
+ type = sample_format_type::RRD;
}
return is;
@@ -345,10 +401,24 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
}
}
+template<typename T>
+o<T *> find_option(vector<sample_output_stream_option *> &options) {
+ for (sample_output_stream_option *& option : options) {
+ T *x = dynamic_cast<T *>(option);
+
+ if (x != nullptr) {
+ return o<T *>(x);
+ }
+ }
+
+ return o<T *>();
+}
+
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
- sample_format_type type) {
+ sample_format_type type,
+ vector<sample_output_stream_option *> options) {
if (type == sample_format_type::CSV) {
return make_unique<CsvSampleOutputStream>(output, dict);
@@ -356,6 +426,14 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
return make_unique<KeyValueSampleOutputStream>(output, dict);
} else if (type == sample_format_type::JSON) {
return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ o<output_fields *> of = find_option<output_fields>(options);
+
+ o<timestamp_field *> tsf = find_option<timestamp_field>(options);
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
// } else if (type == sample_format_type::SQL) {
// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
} else {