aboutsummaryrefslogtreecommitdiff
path: root/sensor/include/trygvis
diff options
context:
space:
mode:
Diffstat (limited to 'sensor/include/trygvis')
-rw-r--r--sensor/include/trygvis/sensor/io.h125
1 files changed, 57 insertions, 68 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index 304091b..403af9b 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -4,6 +4,7 @@
#include <mutex>
#include <boost/asio/buffer.hpp>
+#include <utility>
namespace trygvis {
namespace sensor {
@@ -13,47 +14,40 @@ using namespace std;
using namespace boost::asio;
struct sample_output_stream_option {
- virtual ~sample_output_stream_option() {
- };
+ virtual ~sample_output_stream_option() = default;;
};
struct output_fields_option : sample_output_stream_option {
- ~output_fields_option() {
- }
+ ~output_fields_option() override = default;
vector<string> fields;
};
struct timestamp_field_option : sample_output_stream_option {
- timestamp_field_option(const string name) : name(name) {
- }
+ explicit timestamp_field_option(string name) : name(std::move(name)) {}
- ~timestamp_field_option() {
- }
+ ~timestamp_field_option() override = default;
const string name;
};
class table_name_option : public sample_output_stream_option {
public:
- table_name_option(const string name) : name(name) {
- }
+ explicit table_name_option(string name) : name(std::move(name)) {}
- ~table_name_option() {
- }
+ ~table_name_option() override = default;
const string name;
};
class sample_output_stream_options : public vector<sample_output_stream_option *> {
public:
- ~sample_output_stream_options() {
- }
+ ~sample_output_stream_options() = default;
template<typename T>
o<T *> find_option() const {
- for (auto it = begin(); it != end(); ++it) {
- T *x = dynamic_cast<T *>(*it);
+ for (auto it : *this) {
+ auto *x = dynamic_cast<T *>(it);
if (x != nullptr) {
return o<T *>(x);
@@ -65,90 +59,85 @@ public:
};
struct missing_required_option_error : runtime_error {
- missing_required_option_error(string what) : runtime_error(what) {
- }
+ explicit missing_required_option_error(const string &what) : runtime_error(what) {}
- ~missing_required_option_error() {
- }
+ ~missing_required_option_error() override = default;
};
class SampleStreamParser;
-class SampleOutputStream;
+class SampleConsumer;
/**
* Throws missing_required_option_error
*/
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type = sample_format_type::AUTO);
/**
* Throws missing_required_option_error
*/
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options);
static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type) {
sample_output_stream_options options;
- return open_sample_output_stream(output, dict, type, options);
+ return open_sample_writer(std::move(output), dict, type, options);
}
-class SampleOutputStream {
+class SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) = 0;
+ virtual void onSample(SampleRecord const &sample) = 0;
};
-class VectorSampleOutputStream : public SampleOutputStream {
+class VectorSampleOutputStream : public SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
-public:
vector<SampleRecord> samples;
};
-class ThreadSafeSampleOutputStream : public SampleOutputStream {
+class ThreadSafeSampleConsumer : public SampleConsumer {
public:
- ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+ explicit ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying);
- ~ThreadSafeSampleOutputStream() {
- }
+ ~ThreadSafeSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying;
+ unique_ptr<SampleConsumer> underlying;
std::mutex mutex;
};
-class AddTimestampSampleOutputStream : public SampleOutputStream {
+class AddTimestampSampleConsumer : public SampleConsumer {
public:
- AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string &timestamp_name);
+ AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string &timestamp_name);
- ~AddTimestampSampleOutputStream() {
- }
+ ~AddTimestampSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying_;
+ unique_ptr<SampleConsumer> underlying_;
const SampleKey* timestamp_key;
};
-class CsvSampleOutputStream : public SampleOutputStream {
+class CsvWriterSampleConsumer : public SampleConsumer {
public:
- CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample);
+ void onSample(SampleRecord const &sample) override;
private:
void writeHeader();
@@ -158,33 +147,33 @@ private:
bool headerWritten;
};
-class JsonSampleOutputStream : public SampleOutputStream {
+class JsonWriterSampleConsumer : public SampleConsumer {
public:
- JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class KeyValueSampleOutputStream : public SampleOutputStream {
+class KeyValueWriterSampleConsumer : public SampleConsumer {
public:
- KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class RrdSampleOutputStream : public SampleOutputStream {
+class RrdWriterSampleConsumer : public SampleConsumer {
public:
- RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
+ RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
vector<const SampleKey *> keys;
@@ -192,11 +181,11 @@ private:
const SampleKey *timestamp_key;
};
-class SqlSampleOutputStream : public SampleOutputStream {
+class SqlWriterSampleConsumer : public SampleConsumer {
public:
- SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+ SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
@@ -235,8 +224,8 @@ protected:
class KeyValueSampleStreamParser : public SampleStreamParser {
public:
- KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::CSV, dict), output(output),
+ KeyValueSampleStreamParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)),
line(make_shared<vector<uint8_t>>()) {
}
@@ -248,17 +237,17 @@ private:
void process_line(shared_ptr<vector<uint8_t>> &packet);
static const uint8_t packet_delimiter = '\n';
- shared_ptr<SampleOutputStream> output;
+ shared_ptr<SampleConsumer> output;
shared_ptr<vector<uint8_t>> line;
};
class AutoSampleParser : public SampleStreamParser {
public:
- AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+ AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict);
- virtual int process(mutable_buffers_1 &buffer) override;
+ int process(mutable_buffers_1 &buffer) override;
- virtual int finish() override;
+ int finish() override;
private:
unique_ptr<SampleStreamParser> parser;
@@ -266,10 +255,10 @@ private:
};
static inline
-unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
- return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+unique_ptr<ThreadSafeSampleConsumer> thread_safe_sample_output_stream(unique_ptr<SampleConsumer> underlying) {
+ return make_unique<ThreadSafeSampleConsumer>(move(underlying));
};
-}
-}
-}
+} // namespace io
+} // namespace sensor
+} // namespace trygvis