aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/SoilMoistureIo.cpp82
-rw-r--r--apps/SoilMoistureIo.h62
-rw-r--r--apps/sample-convert.cpp24
-rw-r--r--apps/sample-timestamp.cpp3
4 files changed, 163 insertions, 8 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 66e1dce..4d2e03d 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -173,6 +173,58 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << endl;
}
+RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey* timestamp_key, o<output_fields *> output_fields) :
+ stream(move(stream)), timestamp_key(timestamp_key) {
+ if (output_fields) {
+ for (auto field : output_fields.get()->fields) {
+ keys.emplace_back(dict.indexOf(field));
+ }
+ } else {
+ for (auto key : dict) {
+ keys.emplace_back(key);
+ }
+ }
+}
+
+void RrdSampleOutputStream::write(SampleRecord const &sample) {
+ // Skip empty records
+ if (sample.empty()) {
+ return;
+ }
+
+ auto &s = *stream.get();
+
+ auto timestampO = sample.at(timestamp_key);
+
+ if (!timestampO) {
+ return;
+ }
+
+ auto timestamp = timestampO.get();
+
+ s << timestamp;
+
+ bool first = true;
+ for (auto &key: keys) {
+ if (key == timestamp_key) {
+ continue;
+ }
+
+ auto value = sample.at(key);
+
+ if (first) {
+ s << "@";
+ first = false;
+ } else {
+ s << ":";
+ }
+
+ s << (value ? value.get() : "U");
+ }
+
+ *stream.get() << endl;
+}
+
SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
dict(dict), stream(move(stream)), table_name(table_name) {
}
@@ -305,8 +357,10 @@ string to_string(const sample_format_type &arg) {
return "key-value";
else if (arg == sample_format_type::SQL)
return "sql";
+ else if (arg == sample_format_type::RRD)
+ return "rrd";
else
- throw std::runtime_error("Unknown format value: " + to_string(arg));
+ return "unknown";
}
std::ostream& operator<<(std::ostream& os, sample_format_type const& type) {
@@ -327,6 +381,8 @@ std::istream& operator>>(std::istream& is, sample_format_type& type) {
type = sample_format_type::JSON;
} else if (s == "sql") {
type = sample_format_type::SQL;
+ } else if (s == "rrd") {
+ type = sample_format_type::RRD;
}
return is;
@@ -345,10 +401,24 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
}
}
+template<typename T>
+o<T *> find_option(vector<sample_output_stream_option *> &options) {
+ for (sample_output_stream_option *& option : options) {
+ T *x = dynamic_cast<T *>(option);
+
+ if (x != nullptr) {
+ return o<T *>(x);
+ }
+ }
+
+ return o<T *>();
+}
+
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
- sample_format_type type) {
+ sample_format_type type,
+ vector<sample_output_stream_option *> options) {
if (type == sample_format_type::CSV) {
return make_unique<CsvSampleOutputStream>(output, dict);
@@ -356,6 +426,14 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
return make_unique<KeyValueSampleOutputStream>(output, dict);
} else if (type == sample_format_type::JSON) {
return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ o<output_fields *> of = find_option<output_fields>(options);
+
+ o<timestamp_field *> tsf = find_option<timestamp_field>(options);
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
// } else if (type == sample_format_type::SQL) {
// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
} else {
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index fac518c..bdb0932 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -26,7 +26,8 @@ enum class sample_format_type {
CSV,
KEY_VALUE,
JSON,
- SQL
+ SQL,
+ RRD,
};
string to_string(const sample_format_type &arg);
@@ -48,10 +49,52 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
KeyDictionary &dict,
sample_format_type type = sample_format_type::AUTO);
+class sample_output_stream_option {
+public:
+ virtual ~sample_output_stream_option() {
+ };
+};
+
+class output_fields : public sample_output_stream_option {
+public:
+// output_fields() {
+// }
+//
+// output_fields(std::vector<string>::iterator begin, std::vector<string>::iterator end) :
+// fields(begin, end) {
+// }
+
+ ~output_fields() {
+ }
+
+ vector<string> fields;
+};
+
+
+class timestamp_field : public sample_output_stream_option {
+public:
+ timestamp_field(string name) : name(name) {
+ }
+
+ ~timestamp_field() {
+ }
+
+ string name;
+};
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ vector<sample_output_stream_option *> options);
+
+static inline
unique_ptr<SampleOutputStream> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
- sample_format_type type);
+ sample_format_type type) {
+ return open_sample_output_stream(output, dict, type);
+}
class sample_exception : public runtime_error {
public:
@@ -209,7 +252,7 @@ public:
return o<A>(boost::lexical_cast<A>(value.get()));
}
- string to_string() {
+ string to_string() const {
SampleKeyIndex i = 0;
string s;
for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
@@ -285,6 +328,18 @@ private:
shared_ptr<ostream> stream;
};
+class RrdSampleOutputStream : public SampleOutputStream {
+public:
+ RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields *> output_fields);
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ vector<SampleKey *> keys;
+ shared_ptr<ostream> stream;
+ const SampleKey *timestamp_key;
+};
+
class SqlSampleOutputStream : public SampleOutputStream {
public:
SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
@@ -299,6 +354,7 @@ private:
class SampleStreamParser {
public:
+ // TODO: return number of samples found for progress indication?
virtual void process(mutable_buffers_1 buffer) = 0;
virtual sample_format_type type() {
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index c6f5d01..d1c53fb 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -2,6 +2,7 @@
#include "json.hpp"
#include "apps.h"
#include <fstream>
+#include <boost/tokenizer.hpp>
namespace trygvis {
namespace apps {
@@ -9,9 +10,12 @@ namespace apps {
using namespace std;
using namespace trygvis::apps;
using namespace trygvis::soil_moisture;
+using boost::tokenizer;
namespace po = boost::program_options;
class sample_convert : public app {
+ string fields;
+ string timestamp_field;
public:
void add_options(po::options_description_easy_init &options) override {
@@ -20,7 +24,9 @@ public:
("input", po::value<string>(&input_file)->default_value("-"))
// ("input-format", po::value<string>(&input_format)->default_value("csv"))
("output", po::value<string>(&output_file)->default_value("-"))
- ("output-format", po::value<sample_format_type>(&output_format)->default_value(sample_format_type::KEY_VALUE));
+ ("output-format", po::value<sample_format_type>(&output_format)->default_value(sample_format_type::KEY_VALUE))
+ ("fields", po::value<string>(&fields))
+ ("timestamp-field", po::value<string>(&timestamp_field));
}
void add_extra_options(po::options_description &all_options) override {
@@ -59,7 +65,21 @@ public:
}
}
- shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, output_format);
+ std::vector<sample_output_stream_option *> options;
+ trygvis::soil_moisture::timestamp_field tf(timestamp_field);
+
+ if (!timestamp_field.empty()) {
+ options.push_back(&tf);
+ }
+
+ tokenizer<> tok(fields);
+ output_fields fs;
+ std::copy(tok.begin(), tok.end(), std::back_inserter(fs.fields));
+ if (!fs.fields.empty()) {
+ options.push_back(&fs);
+ }
+
+ shared_ptr <SampleOutputStream> output = open_sample_output_stream(outputStream, dict, output_format, options);
auto input = make_shared<KeyValueSampleParser>(output, dict);
diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp
index 0affd36..0242021 100644
--- a/apps/sample-timestamp.cpp
+++ b/apps/sample-timestamp.cpp
@@ -176,7 +176,8 @@ public:
return EXIT_FAILURE;
}
- unique_ptr<SampleOutputStream> unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, parser->type());
+ vector<sample_output_stream_option*> options = {};
+ unique_ptr<SampleOutputStream> unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, parser->type(), options);
shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)};
shared_ptr<SampleOutputStream> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict, timestamp_name, relative_name, relative_resolution, start_time);
parser = open_sample_input_stream(p, dict, parser->type());