aboutsummaryrefslogtreecommitdiff
path: root/sensor
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2017-09-01 13:59:25 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2017-09-01 14:06:01 +0200
commit1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6 (patch)
tree691f851318c60465dd5d4c837136e991026432ff /sensor
parent3af4d83dfa190d8d95aefbf31ae0b1d85abe492a (diff)
downloadble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.gz
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.bz2
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.xz
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.zip
o Renaming SampleOutputStream to SampleConsumer.
o Some c++ style fixes.
Diffstat (limited to 'sensor')
-rw-r--r--sensor/include/trygvis/sensor/io.h125
-rw-r--r--sensor/main/io.cpp56
2 files changed, 85 insertions, 96 deletions
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index 304091b..403af9b 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -4,6 +4,7 @@
#include <mutex>
#include <boost/asio/buffer.hpp>
+#include <utility>
namespace trygvis {
namespace sensor {
@@ -13,47 +14,40 @@ using namespace std;
using namespace boost::asio;
struct sample_output_stream_option {
- virtual ~sample_output_stream_option() {
- };
+ virtual ~sample_output_stream_option() = default;;
};
struct output_fields_option : sample_output_stream_option {
- ~output_fields_option() {
- }
+ ~output_fields_option() override = default;
vector<string> fields;
};
struct timestamp_field_option : sample_output_stream_option {
- timestamp_field_option(const string name) : name(name) {
- }
+ explicit timestamp_field_option(string name) : name(std::move(name)) {}
- ~timestamp_field_option() {
- }
+ ~timestamp_field_option() override = default;
const string name;
};
class table_name_option : public sample_output_stream_option {
public:
- table_name_option(const string name) : name(name) {
- }
+ explicit table_name_option(string name) : name(std::move(name)) {}
- ~table_name_option() {
- }
+ ~table_name_option() override = default;
const string name;
};
class sample_output_stream_options : public vector<sample_output_stream_option *> {
public:
- ~sample_output_stream_options() {
- }
+ ~sample_output_stream_options() = default;
template<typename T>
o<T *> find_option() const {
- for (auto it = begin(); it != end(); ++it) {
- T *x = dynamic_cast<T *>(*it);
+ for (auto it : *this) {
+ auto *x = dynamic_cast<T *>(it);
if (x != nullptr) {
return o<T *>(x);
@@ -65,90 +59,85 @@ public:
};
struct missing_required_option_error : runtime_error {
- missing_required_option_error(string what) : runtime_error(what) {
- }
+ explicit missing_required_option_error(const string &what) : runtime_error(what) {}
- ~missing_required_option_error() {
- }
+ ~missing_required_option_error() override = default;
};
class SampleStreamParser;
-class SampleOutputStream;
+class SampleConsumer;
/**
* Throws missing_required_option_error
*/
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type = sample_format_type::AUTO);
/**
* Throws missing_required_option_error
*/
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options);
static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type) {
sample_output_stream_options options;
- return open_sample_output_stream(output, dict, type, options);
+ return open_sample_writer(std::move(output), dict, type, options);
}
-class SampleOutputStream {
+class SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) = 0;
+ virtual void onSample(SampleRecord const &sample) = 0;
};
-class VectorSampleOutputStream : public SampleOutputStream {
+class VectorSampleOutputStream : public SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
-public:
vector<SampleRecord> samples;
};
-class ThreadSafeSampleOutputStream : public SampleOutputStream {
+class ThreadSafeSampleConsumer : public SampleConsumer {
public:
- ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+ explicit ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying);
- ~ThreadSafeSampleOutputStream() {
- }
+ ~ThreadSafeSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying;
+ unique_ptr<SampleConsumer> underlying;
std::mutex mutex;
};
-class AddTimestampSampleOutputStream : public SampleOutputStream {
+class AddTimestampSampleConsumer : public SampleConsumer {
public:
- AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string &timestamp_name);
+ AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string &timestamp_name);
- ~AddTimestampSampleOutputStream() {
- }
+ ~AddTimestampSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying_;
+ unique_ptr<SampleConsumer> underlying_;
const SampleKey* timestamp_key;
};
-class CsvSampleOutputStream : public SampleOutputStream {
+class CsvWriterSampleConsumer : public SampleConsumer {
public:
- CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample);
+ void onSample(SampleRecord const &sample) override;
private:
void writeHeader();
@@ -158,33 +147,33 @@ private:
bool headerWritten;
};
-class JsonSampleOutputStream : public SampleOutputStream {
+class JsonWriterSampleConsumer : public SampleConsumer {
public:
- JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class KeyValueSampleOutputStream : public SampleOutputStream {
+class KeyValueWriterSampleConsumer : public SampleConsumer {
public:
- KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class RrdSampleOutputStream : public SampleOutputStream {
+class RrdWriterSampleConsumer : public SampleConsumer {
public:
- RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
+ RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
vector<const SampleKey *> keys;
@@ -192,11 +181,11 @@ private:
const SampleKey *timestamp_key;
};
-class SqlSampleOutputStream : public SampleOutputStream {
+class SqlWriterSampleConsumer : public SampleConsumer {
public:
- SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+ SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
@@ -235,8 +224,8 @@ protected:
class KeyValueSampleStreamParser : public SampleStreamParser {
public:
- KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::CSV, dict), output(output),
+ KeyValueSampleStreamParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)),
line(make_shared<vector<uint8_t>>()) {
}
@@ -248,17 +237,17 @@ private:
void process_line(shared_ptr<vector<uint8_t>> &packet);
static const uint8_t packet_delimiter = '\n';
- shared_ptr<SampleOutputStream> output;
+ shared_ptr<SampleConsumer> output;
shared_ptr<vector<uint8_t>> line;
};
class AutoSampleParser : public SampleStreamParser {
public:
- AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+ AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict);
- virtual int process(mutable_buffers_1 &buffer) override;
+ int process(mutable_buffers_1 &buffer) override;
- virtual int finish() override;
+ int finish() override;
private:
unique_ptr<SampleStreamParser> parser;
@@ -266,10 +255,10 @@ private:
};
static inline
-unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
- return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+unique_ptr<ThreadSafeSampleConsumer> thread_safe_sample_output_stream(unique_ptr<SampleConsumer> underlying) {
+ return make_unique<ThreadSafeSampleConsumer>(move(underlying));
};
-}
-}
-}
+} // namespace io
+} // namespace sensor
+} // namespace trygvis
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 9e822c3..a0dcdc9 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -16,7 +16,7 @@ using boost::escaped_list_separator;
using json = nlohmann::json;
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type) {
if (type == sample_format_type::KEY_VALUE) {
@@ -28,18 +28,18 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser(
}
}
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
+ return make_unique<CsvWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
+ return make_unique<KeyValueWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
+ return make_unique<JsonWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::RRD) {
auto of = options.find_option<output_fields_option>();
@@ -47,7 +47,7 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp");
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+ return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of);
} else if (type == sample_format_type::SQL) {
auto tno = options.find_option<table_name_option>();
@@ -55,30 +55,30 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
throw missing_required_option_error("table name");
}
- return make_unique<SqlSampleOutputStream>(move(output), dict, tno.value()->name);
+ return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name);
} else {
throw sample_exception("No writer for format type: " + to_string(type));
}
}
-ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
+ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying)
: underlying(move(underlying)) {
}
-void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) {
std::unique_lock<std::mutex> lock(mutex);
- underlying->write(sample);
+ underlying->onSample(sample);
}
-AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
+AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying,
KeyDictionary &dict,
const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
}
-void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) {
if (sample.at(timestamp_key)) {
- underlying_->write(sample);
+ underlying_->onSample(sample);
return;
}
@@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
SampleRecord copy = sample;
copy.set(timestamp_key, timestamp_s);
- underlying_->write(copy);
+ underlying_->onSample(copy);
}
-void VectorSampleOutputStream::write(SampleRecord const &sample) {
+void VectorSampleOutputStream::onSample(SampleRecord const &sample) {
if (sample.empty()) {
return;
}
@@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) {
samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
+CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict)
: stream(move(stream)), headerWritten(false), dict(dict) {
}
-void CsvSampleOutputStream::write(SampleRecord const &sample) {
+void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-void CsvSampleOutputStream::writeHeader() {
+void CsvWriterSampleConsumer::writeHeader() {
auto &s = *stream;
auto i = dict.begin();
@@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() {
s << endl << flush;
}
-JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void JsonSampleOutputStream::write(SampleRecord const &sample) {
+void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << doc << endl << flush;
}
-KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
+void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*s << endl << flush;
}
-RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
+RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream,
KeyDictionary &dict,
const SampleKey *timestamp_key,
o<output_fields_option *> output_fields) :
@@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
}
}
-void RrdSampleOutputStream::write(SampleRecord const &sample) {
+void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
dict(dict), stream(move(stream)), table_name(table_name) {
}
-void SqlSampleOutputStream::write(SampleRecord const &sample) {
+void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) {
string fs, vs;
fs.reserve(1024);
@@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packe
sample.set(key, value);
}
- output->write(sample);
+ output->onSample(sample);
}
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);