aboutsummaryrefslogtreecommitdiff
path: root/sensor
diff options
context:
space:
mode:
Diffstat (limited to 'sensor')
-rw-r--r--sensor/include/trygvis/sensor.h3
-rw-r--r--sensor/include/trygvis/sensor/io.h86
-rw-r--r--sensor/main/io.cpp186
3 files changed, 155 insertions, 120 deletions
diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h
index 4662bab..f8cfbe5 100644
--- a/sensor/include/trygvis/sensor.h
+++ b/sensor/include/trygvis/sensor.h
@@ -148,6 +148,9 @@ public:
: dict(dict), values(values) {
}
+ SampleRecord(const SampleRecord &copy) : dict(copy.dict), values(copy.values) {
+ }
+
inline
vec::const_iterator cbegin() const {
return values.cbegin();
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index f92b800..f86f2d9 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -12,29 +12,37 @@ namespace io {
using namespace std;
using namespace boost::asio;
-class sample_output_stream_option {
-public:
+struct sample_output_stream_option {
virtual ~sample_output_stream_option() {
};
};
-class output_fields_option : public sample_output_stream_option {
-public:
+struct output_fields_option : sample_output_stream_option {
~output_fields_option() {
}
vector<string> fields;
};
-class timestamp_field_option : public sample_output_stream_option {
-public:
- timestamp_field_option(string name) : name(name) {
+struct timestamp_field_option : sample_output_stream_option {
+ timestamp_field_option(const string name) : name(name) {
}
~timestamp_field_option() {
}
- string name;
+ const string name;
+};
+
+class table_name_option : public sample_output_stream_option {
+public:
+ table_name_option(const string name) : name(name) {
+ }
+
+ ~table_name_option() {
+ }
+
+ const string name;
};
class sample_output_stream_options : public vector<sample_output_stream_option *> {
@@ -56,6 +64,44 @@ public:
}
};
+struct missing_required_option_error : runtime_error {
+ missing_required_option_error(string what) : runtime_error(what) {
+ }
+
+ ~missing_required_option_error() {
+ }
+};
+
+class SampleStreamParser;
+
+class SampleOutputStream;
+
+/**
+ * Throws missing_required_option_error
+ */
+unique_ptr<SampleStreamParser> open_sample_stream_parser(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type = sample_format_type::AUTO);
+
+/**
+ * Throws missing_required_option_error
+ */
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ sample_output_stream_options options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ sample_output_stream_options options;
+ return open_sample_output_stream(output, dict, type, options);
+}
+
class SampleOutputStream {
public:
virtual void write(SampleRecord const &sample) = 0;
@@ -104,10 +150,6 @@ public:
void write(SampleRecord const &sample);
- const KeyDictionary &getDict() {
- return dict;
- }
-
private:
void writeHeader();
@@ -212,26 +254,6 @@ private:
unique_ptr<KeyValueSampleStreamParser> keyValueParser;
};
-unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
- KeyDictionary &dict,
- sample_format_type type = sample_format_type::AUTO);
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type,
- sample_output_stream_options options);
-
-static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type) {
- sample_output_stream_options options;
- return open_sample_output_stream(output, dict, type, options);
-}
-
static inline
unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index e73b9d4..255b4f5 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,7 +1,6 @@
#include "trygvis/sensor/io.h"
#include <map>
-#include <chrono>
#include "json.hpp"
#include "boost/tokenizer.hpp"
#include <boost/algorithm/string.hpp>
@@ -16,6 +15,52 @@ using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;
+unique_ptr<SampleStreamParser> open_sample_stream_parser(
+ shared_ptr<SampleOutputStream> output,
+ KeyDictionary &dict,
+ sample_format_type type) {
+ if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleStreamParser>(output, dict);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output, dict);
+ } else {
+ throw sample_exception("No parser for format type: " + to_string(type));
+ }
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ shared_ptr<ostream> output,
+ KeyDictionary &dict,
+ sample_format_type type,
+ sample_output_stream_options options) {
+
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::KEY_VALUE) {
+ return make_unique<KeyValueSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::JSON) {
+ return make_unique<JsonSampleOutputStream>(output, dict);
+ } else if (type == sample_format_type::RRD) {
+ auto of = options.find_option<output_fields_option>();
+
+ auto tsf = options.find_option<timestamp_field_option>();
+
+ auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+ return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+ } else if (type == sample_format_type::SQL) {
+ auto tno = options.find_option<table_name_option>();
+
+ if (!tno.is_initialized()) {
+ throw missing_required_option_error("table name");
+ }
+
+ return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name);
+ } else {
+ throw sample_exception("No writer for format type: " + to_string(type));
+ }
+}
+
ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
: underlying(move(underlying)) {
}
@@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<Sample
}
void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+ if (sample.at(timestamp_key)) {
+ underlying_->write(sample);
+ return;
+ }
+
auto time_since_epoch = system_clock::now().time_since_epoch();
auto timestamp = duration_cast<seconds>(time_since_epoch).count();
auto timestamp_s = std::to_string(timestamp);
@@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDict
dict(dict), stream(move(stream)), table_name(table_name) {
}
-void SqlSampleOutputStream::write(SampleRecord const &values) {
- throw sample_exception("deimplemented");
-
-// string fs, vs;
-//
-// fs.reserve(1024);
-// vs.reserve(1024);
-//
-// if (filter_fields) {
-// auto i = fields.begin();
-//
-// while (i != fields.end()) {
-// auto field = *i;
-//
-// fs += field;
-//
-// auto value = values.find(field);
-//
-// if (value != values.end()) {
-// vs += "'" + value->second + "'";
-// } else {
-// vs += "NULL";
-// }
-//
-// i++;
-//
-// if (i != fields.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// } else {
-// auto i = values.begin();
-// while (i != values.end()) {
-// auto v = *i++;
-//
-// fs += v.first;
-// vs += "'" + v.second + "'";
-//
-// if (i != values.end()) {
-// fs += ",";
-// vs += ",";
-// }
-// }
-// }
-//
-// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
+void SqlSampleOutputStream::write(SampleRecord const &sample) {
+ string fs, vs;
+
+ fs.reserve(1024);
+ vs.reserve(1024);
+
+ auto &s = *stream.get();
+
+ bool first = true;
+ if (!dict.empty()) {
+ for (auto &key: dict) {
+ auto sample_key = sample.dict.indexOf(key->name);
+
+ auto value = sample.at(sample_key);
+
+ if (value) {
+ if (first) {
+ first = false;
+ } else {
+ fs += ", ";
+ vs += ", ";
+ }
+ fs += key->name;
+ vs += value.get();
+ }
+ }
+ } else {
+ for (auto &sample_key: sample.dict) {
+ auto o = sample.at(sample_key);
+
+ if (o) {
+ if (first) {
+ first = false;
+ } else {
+ fs += ", ";
+ vs += ", ";
+ }
+ // Make sure that the key is registered in the dictionary
+ dict.indexOf(sample_key->name);
+
+ fs += sample_key->name;
+ vs += o.get();
+ }
+ }
+ }
+
+ s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}
void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
@@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) {
}
}
-unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
- KeyDictionary &dict,
- sample_format_type type) {
- if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleStreamParser>(output, dict);
- } else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(output, dict);
- } else {
- throw sample_exception("No parser for format type: " + to_string(type));
- }
-}
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
- shared_ptr<ostream> output,
- KeyDictionary &dict,
- sample_format_type type,
- sample_output_stream_options options) {
-
- if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
- } else if (type == sample_format_type::RRD) {
- o<output_fields_option *> of = options.find_option<output_fields_option>();
-
- o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>();
-
- auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
-
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
-// } else if (type == sample_format_type::SQL) {
-// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
- } else {
- throw sample_exception("No writer for format type: " + to_string(type));
- }
-}
-
}
}
} \ No newline at end of file