aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2017-09-01 13:59:25 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2017-09-01 14:06:01 +0200
commit1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6 (patch)
tree691f851318c60465dd5d4c837136e991026432ff
parent3af4d83dfa190d8d95aefbf31ae0b1d85abe492a (diff)
downloadble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.gz
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.bz2
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.xz
ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.zip
o Renaming SampleOutputStream to SampleConsumer.
o Some c++ style fixes.
-rw-r--r--.gitignore1
-rw-r--r--CMakeLists.txt21
-rw-r--r--apps/apps.h10
-rw-r--r--apps/mqtt-publish.cpp10
-rw-r--r--apps/sample-add-timestamp.cpp33
-rw-r--r--apps/sample-convert.cpp6
-rw-r--r--apps/sample-select.cpp6
-rw-r--r--apps/sm-get-value.cpp4
-rw-r--r--apps/sm-serial-read-all.cpp2
-rw-r--r--apps/sm-serial-read.cpp2
-rw-r--r--sensor/include/trygvis/sensor/io.h125
-rw-r--r--sensor/main/io.cpp56
12 files changed, 131 insertions, 145 deletions
diff --git a/.gitignore b/.gitignore
index 74c7978..f8bef31 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ build
*.tmp.*
tmp.*
*.tmp
+cmake-build-*
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9da7eb6..8c64151 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,11 +7,18 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG")
-IF(NOT CMAKE_BUILD_TYPE)
- SET(CMAKE_BUILD_TYPE Debug CACHE STRING
- "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel."
- FORCE)
-ENDIF(NOT CMAKE_BUILD_TYPE)
+if (NOT CMAKE_BUILD_TYPE)
+ SET(CMAKE_BUILD_TYPE Debug CACHE STRING
+ "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel."
+ FORCE)
+endif (NOT CMAKE_BUILD_TYPE)
+
+if (CMAKE_C_COMPILER_ID STREQUAL "GNU" OR CMAKE_C_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -flto")
+endif ()
+if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto")
+endif ()
# Boost
set(Boost_USE_STATIC_LIBS OFF)
@@ -37,8 +44,8 @@ if (DOXYGEN_FOUND)
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
COMMENT "Generating documentation" VERBATIM)
- add_custom_target(doc DEPENDS ${doxy_html_index_file} )
-endif(DOXYGEN_FOUND)
+ add_custom_target(doc DEPENDS ${doxy_html_index_file})
+endif (DOXYGEN_FOUND)
add_subdirectory(ble)
add_subdirectory(apps)
diff --git a/apps/apps.h b/apps/apps.h
index 9173ae0..21a25a3 100644
--- a/apps/apps.h
+++ b/apps/apps.h
@@ -126,12 +126,6 @@ std::string get_hostname();
int real_main(app *app, int argc, const char *argv[]);
-template<typename T>
-class noop_delete {
-public:
- void operator()(T *) const { }
-};
-
static inline void noop_deleter(void *) { }
-}
-}
+} // namespace apps
+} // namespace trygvis
diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp
index 25e8ae9..1498b73 100644
--- a/apps/mqtt-publish.cpp
+++ b/apps/mqtt-publish.cpp
@@ -22,12 +22,12 @@ using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
using namespace trygvis::mqtt_support;
-class MqttSampleOutputStream : public SampleOutputStream {
+class MqttSampleOutputStream : public SampleConsumer {
public:
MqttSampleOutputStream(const o<string> &client_id, bool clean_session, string host, unsigned int port,
string topic_name,
unsigned int keep_alive)
- : SampleOutputStream(),
+ : SampleConsumer(),
client(host, port, keep_alive, client_id, clean_session),
topic_name(topic_name) {
client.connect();
@@ -37,15 +37,15 @@ public:
client.disconnect();
};
- void write(SampleRecord const &sample) override {
+ void onSample(SampleRecord const &sample) override {
if (sample.empty()) {
return;
}
// make a string of the sample
auto buf = make_shared<stringstream>();
- KeyValueSampleOutputStream out(buf, sample.dict);
- out.write(sample);
+ KeyValueWriterSampleConsumer out(buf, sample.dict);
+ out.onSample(sample);
string s = buf->str();
cout << "sample: " << s;
diff --git a/apps/sample-add-timestamp.cpp b/apps/sample-add-timestamp.cpp
index 8bc4bac..841782e 100644
--- a/apps/sample-add-timestamp.cpp
+++ b/apps/sample-add-timestamp.cpp
@@ -1,8 +1,6 @@
#include "trygvis/sensor.h"
#include "trygvis/sensor/io.h"
#include "apps.h"
-#include <vector>
-#include "apps.h"
namespace trygvis {
namespace apps {
@@ -13,10 +11,10 @@ using namespace trygvis::sensor;
using namespace trygvis::sensor::io;
namespace po = boost::program_options;
-class TimestampAddingSampleOutputStream : public SampleOutputStream {
+class TimestampAddingSampleOutputStream : public SampleConsumer {
public:
- TimestampAddingSampleOutputStream(shared_ptr<SampleOutputStream> output, KeyDictionary &dict, string timestamp_name)
- : timestamp_key(dict.indexOf(timestamp_name)) {
+ TimestampAddingSampleOutputStream(shared_ptr<SampleConsumer> output, KeyDictionary &dict, string timestamp_name)
+ : output_(output), timestamp_key(dict.indexOf(timestamp_name)) {
if (input_time_resolution_ == time_resolution::MILLISECONDS) {
factor = 1000;
} else {
@@ -24,21 +22,21 @@ public:
}
}
- virtual void write(SampleRecord const &sample) override {
+ virtual void onSample(const SampleRecord &sample) override {
time_t now = time(NULL) * factor;
SampleRecord updated_sample(sample);
updated_sample.set(timestamp_key, std::to_string(now));
- output_->write(updated_sample);
+ output_->onSample(updated_sample);
};
private:
const SampleKey *timestamp_key;
- time_resolution input_time_resolution_;
+ const time_resolution input_time_resolution_ = time_resolution::MILLISECONDS;
int factor;
- shared_ptr<SampleOutputStream> output_;
+ shared_ptr<SampleConsumer> output_;
};
class sample_add_timestamp : public app {
@@ -62,25 +60,22 @@ public:
const int buffer_size = 1024;
int main(app_execution &execution) override {
- shared_ptr<istream> input;
-
- input = shared_ptr<istream>(&cin, noop_deleter);
+ auto stdin = shared_ptr<istream>(&cin, noop_deleter);
+ auto stdout = shared_ptr<ostream>(&cout, noop_deleter);
KeyDictionary dict;
sample_output_stream_options options = {};
- auto unique_output_stream =
- open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, output_format, options);
- shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)};
- auto p = make_shared<TimestampAddingSampleOutputStream>(output_stream, dict, timestamp_name);
+ auto writer = open_sample_writer(stdout, dict, output_format, options);
+ auto p = make_shared<TimestampAddingSampleOutputStream>(std::move(writer), dict, timestamp_name);
auto parser = open_sample_stream_parser(p, dict);
int recordCount = 0;
- while (!input->eof()) {
+ while (!stdin->eof()) {
char buffer[buffer_size];
- input->read(buffer, buffer_size);
- size_t gcount = (size_t) input->gcount();
+ stdin->read(buffer, buffer_size);
+ auto gcount = static_cast<size_t>(stdin->gcount());
recordCount++;
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index 81c08ca..2142462 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -93,13 +93,13 @@ public:
options.push_back(&fs);
}
- unique_ptr<SampleOutputStream> o = open_sample_output_stream(outputStream, dict, output_format, options);
+ unique_ptr<SampleConsumer> o = open_sample_writer(outputStream, dict, output_format, options);
if (add_timestamp) {
- o = make_unique<AddTimestampSampleOutputStream>(move(o), dict, timestamp_field);
+ o = make_unique<AddTimestampSampleConsumer>(move(o), dict, timestamp_field);
}
- shared_ptr<SampleOutputStream> output(move(o));
+ shared_ptr<SampleConsumer> output(move(o));
auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
diff --git a/apps/sample-select.cpp b/apps/sample-select.cpp
index d067af4..1db1443 100644
--- a/apps/sample-select.cpp
+++ b/apps/sample-select.cpp
@@ -91,10 +91,10 @@ public:
}
vector<sample_output_stream_option*> options = {};
- unique_ptr<SampleOutputStream> unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout,
+ unique_ptr<SampleConsumer> unique_output_stream = open_sample_writer(shared_ptr<ostream>(&cout,
noop_deleter), dict, parser->type(), options);
- shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)};
- shared_ptr<SampleOutputStream> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict,
+ shared_ptr<SampleConsumer> output_stream{std::move(unique_output_stream)};
+ shared_ptr<SampleConsumer> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict,
timestamp_name, relative_name, relative_resolution, start_time);
parser = open_sample_stream_parser(p, dict, parser->type());
diff --git a/apps/sm-get-value.cpp b/apps/sm-get-value.cpp
index 42f5da3..483df70 100644
--- a/apps/sm-get-value.cpp
+++ b/apps/sm-get-value.cpp
@@ -109,7 +109,7 @@ public:
auto epoch = system_clock::now().time_since_epoch();
auto timestamp = duration_cast<seconds>(epoch).count();
auto unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, format);
- shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)};
+ shared_ptr<SampleConsumer> output_stream{std::move(unique_output_stream)};
auto sample = SampleRecord(dict)
.set(hostname_key, get_hostname())
@@ -138,7 +138,7 @@ public:
i++;
}
- output_stream->write(sample);
+ output_stream->onSample(sample);
}
void withConnection(sample_format_type format, shared_ptr<BluetoothGatt> gatt) {
diff --git a/apps/sm-serial-read-all.cpp b/apps/sm-serial-read-all.cpp
index a81a016..3c2b617 100644
--- a/apps/sm-serial-read-all.cpp
+++ b/apps/sm-serial-read-all.cpp
@@ -115,7 +115,7 @@ public:
auto output_stream = shared_ptr<ostream>(&cout, noop_deleter);
auto output = open_sample_output_stream(output_stream, outputDict, format);
auto tso = thread_safe_sample_output_stream(move(output));
- shared_ptr<SampleOutputStream> thread_safe_output(move(tso));
+ shared_ptr<SampleConsumer> thread_safe_output(move(tso));
while (run) {
// Port cleanup
diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp
index 980a3f6..69931a0 100644
--- a/apps/sm-serial-read.cpp
+++ b/apps/sm-serial-read.cpp
@@ -90,7 +90,7 @@ public:
}
shared_ptr<ostream> outputStream = shared_ptr<ostream>(&cout, noop_deleter);
- shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, format);
+ shared_ptr<SampleConsumer> output = open_sample_output_stream(outputStream, dict, format);
shared_ptr<KeyValueSampleStreamParser> input = make_shared<KeyValueSampleStreamParser>(output, dict);
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index 304091b..403af9b 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -4,6 +4,7 @@
#include <mutex>
#include <boost/asio/buffer.hpp>
+#include <utility>
namespace trygvis {
namespace sensor {
@@ -13,47 +14,40 @@ using namespace std;
using namespace boost::asio;
struct sample_output_stream_option {
- virtual ~sample_output_stream_option() {
- };
+ virtual ~sample_output_stream_option() = default;;
};
struct output_fields_option : sample_output_stream_option {
- ~output_fields_option() {
- }
+ ~output_fields_option() override = default;
vector<string> fields;
};
struct timestamp_field_option : sample_output_stream_option {
- timestamp_field_option(const string name) : name(name) {
- }
+ explicit timestamp_field_option(string name) : name(std::move(name)) {}
- ~timestamp_field_option() {
- }
+ ~timestamp_field_option() override = default;
const string name;
};
class table_name_option : public sample_output_stream_option {
public:
- table_name_option(const string name) : name(name) {
- }
+ explicit table_name_option(string name) : name(std::move(name)) {}
- ~table_name_option() {
- }
+ ~table_name_option() override = default;
const string name;
};
class sample_output_stream_options : public vector<sample_output_stream_option *> {
public:
- ~sample_output_stream_options() {
- }
+ ~sample_output_stream_options() = default;
template<typename T>
o<T *> find_option() const {
- for (auto it = begin(); it != end(); ++it) {
- T *x = dynamic_cast<T *>(*it);
+ for (auto it : *this) {
+ auto *x = dynamic_cast<T *>(it);
if (x != nullptr) {
return o<T *>(x);
@@ -65,90 +59,85 @@ public:
};
struct missing_required_option_error : runtime_error {
- missing_required_option_error(string what) : runtime_error(what) {
- }
+ explicit missing_required_option_error(const string &what) : runtime_error(what) {}
- ~missing_required_option_error() {
- }
+ ~missing_required_option_error() override = default;
};
class SampleStreamParser;
-class SampleOutputStream;
+class SampleConsumer;
/**
* Throws missing_required_option_error
*/
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type = sample_format_type::AUTO);
/**
* Throws missing_required_option_error
*/
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options);
static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_output_stream(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type) {
sample_output_stream_options options;
- return open_sample_output_stream(output, dict, type, options);
+ return open_sample_writer(std::move(output), dict, type, options);
}
-class SampleOutputStream {
+class SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) = 0;
+ virtual void onSample(SampleRecord const &sample) = 0;
};
-class VectorSampleOutputStream : public SampleOutputStream {
+class VectorSampleOutputStream : public SampleConsumer {
public:
- virtual void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
-public:
vector<SampleRecord> samples;
};
-class ThreadSafeSampleOutputStream : public SampleOutputStream {
+class ThreadSafeSampleConsumer : public SampleConsumer {
public:
- ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+ explicit ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying);
- ~ThreadSafeSampleOutputStream() {
- }
+ ~ThreadSafeSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying;
+ unique_ptr<SampleConsumer> underlying;
std::mutex mutex;
};
-class AddTimestampSampleOutputStream : public SampleOutputStream {
+class AddTimestampSampleConsumer : public SampleConsumer {
public:
- AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string &timestamp_name);
+ AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string &timestamp_name);
- ~AddTimestampSampleOutputStream() {
- }
+ ~AddTimestampSampleConsumer() = default;
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
- unique_ptr<SampleOutputStream> underlying_;
+ unique_ptr<SampleConsumer> underlying_;
const SampleKey* timestamp_key;
};
-class CsvSampleOutputStream : public SampleOutputStream {
+class CsvWriterSampleConsumer : public SampleConsumer {
public:
- CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample);
+ void onSample(SampleRecord const &sample) override;
private:
void writeHeader();
@@ -158,33 +147,33 @@ private:
bool headerWritten;
};
-class JsonSampleOutputStream : public SampleOutputStream {
+class JsonWriterSampleConsumer : public SampleConsumer {
public:
- JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class KeyValueSampleOutputStream : public SampleOutputStream {
+class KeyValueWriterSampleConsumer : public SampleConsumer {
public:
- KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
+ KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
shared_ptr<ostream> stream;
};
-class RrdSampleOutputStream : public SampleOutputStream {
+class RrdWriterSampleConsumer : public SampleConsumer {
public:
- RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
+ RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
vector<const SampleKey *> keys;
@@ -192,11 +181,11 @@ private:
const SampleKey *timestamp_key;
};
-class SqlSampleOutputStream : public SampleOutputStream {
+class SqlWriterSampleConsumer : public SampleConsumer {
public:
- SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
+ SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name);
- void write(SampleRecord const &sample) override;
+ void onSample(SampleRecord const &sample) override;
private:
KeyDictionary &dict;
@@ -235,8 +224,8 @@ protected:
class KeyValueSampleStreamParser : public SampleStreamParser {
public:
- KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::CSV, dict), output(output),
+ KeyValueSampleStreamParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
+ SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)),
line(make_shared<vector<uint8_t>>()) {
}
@@ -248,17 +237,17 @@ private:
void process_line(shared_ptr<vector<uint8_t>> &packet);
static const uint8_t packet_delimiter = '\n';
- shared_ptr<SampleOutputStream> output;
+ shared_ptr<SampleConsumer> output;
shared_ptr<vector<uint8_t>> line;
};
class AutoSampleParser : public SampleStreamParser {
public:
- AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict);
+ AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict);
- virtual int process(mutable_buffers_1 &buffer) override;
+ int process(mutable_buffers_1 &buffer) override;
- virtual int finish() override;
+ int finish() override;
private:
unique_ptr<SampleStreamParser> parser;
@@ -266,10 +255,10 @@ private:
};
static inline
-unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
- return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+unique_ptr<ThreadSafeSampleConsumer> thread_safe_sample_output_stream(unique_ptr<SampleConsumer> underlying) {
+ return make_unique<ThreadSafeSampleConsumer>(move(underlying));
};
-}
-}
-}
+} // namespace io
+} // namespace sensor
+} // namespace trygvis
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index 9e822c3..a0dcdc9 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -16,7 +16,7 @@ using boost::escaped_list_separator;
using json = nlohmann::json;
unique_ptr<SampleStreamParser> open_sample_stream_parser(
- shared_ptr<SampleOutputStream> output,
+ shared_ptr<SampleConsumer> output,
KeyDictionary &dict,
sample_format_type type) {
if (type == sample_format_type::KEY_VALUE) {
@@ -28,18 +28,18 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser(
}
}
-unique_ptr<SampleOutputStream> open_sample_output_stream(
+unique_ptr<SampleConsumer> open_sample_writer(
shared_ptr<ostream> output,
KeyDictionary &dict,
sample_format_type type,
sample_output_stream_options options) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleOutputStream>(output, dict);
+ return make_unique<CsvWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleOutputStream>(output, dict);
+ return make_unique<KeyValueWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::JSON) {
- return make_unique<JsonSampleOutputStream>(output, dict);
+ return make_unique<JsonWriterSampleConsumer>(output, dict);
} else if (type == sample_format_type::RRD) {
auto of = options.find_option<output_fields_option>();
@@ -47,7 +47,7 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp");
- return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+ return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of);
} else if (type == sample_format_type::SQL) {
auto tno = options.find_option<table_name_option>();
@@ -55,30 +55,30 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
throw missing_required_option_error("table name");
}
- return make_unique<SqlSampleOutputStream>(move(output), dict, tno.value()->name);
+ return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name);
} else {
throw sample_exception("No writer for format type: " + to_string(type));
}
}
-ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
+ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying)
: underlying(move(underlying)) {
}
-void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) {
std::unique_lock<std::mutex> lock(mutex);
- underlying->write(sample);
+ underlying->onSample(sample);
}
-AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
+AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying,
KeyDictionary &dict,
const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
}
-void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) {
if (sample.at(timestamp_key)) {
- underlying_->write(sample);
+ underlying_->onSample(sample);
return;
}
@@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
SampleRecord copy = sample;
copy.set(timestamp_key, timestamp_s);
- underlying_->write(copy);
+ underlying_->onSample(copy);
}
-void VectorSampleOutputStream::write(SampleRecord const &sample) {
+void VectorSampleOutputStream::onSample(SampleRecord const &sample) {
if (sample.empty()) {
return;
}
@@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) {
samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
+CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict)
: stream(move(stream)), headerWritten(false), dict(dict) {
}
-void CsvSampleOutputStream::write(SampleRecord const &sample) {
+void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-void CsvSampleOutputStream::writeHeader() {
+void CsvWriterSampleConsumer::writeHeader() {
auto &s = *stream;
auto i = dict.begin();
@@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() {
s << endl << flush;
}
-JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void JsonSampleOutputStream::write(SampleRecord const &sample) {
+void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) {
*stream.get() << doc << endl << flush;
}
-KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
+KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
dict(dict), stream(move(stream)) {
}
-void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
+void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
*s << endl << flush;
}
-RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
+RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream,
KeyDictionary &dict,
const SampleKey *timestamp_key,
o<output_fields_option *> output_fields) :
@@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
}
}
-void RrdSampleOutputStream::write(SampleRecord const &sample) {
+void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) {
// Skip empty records
if (sample.empty()) {
return;
@@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) {
s << endl << flush;
}
-SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
+SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
dict(dict), stream(move(stream)), table_name(table_name) {
}
-void SqlSampleOutputStream::write(SampleRecord const &sample) {
+void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) {
string fs, vs;
fs.reserve(1024);
@@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packe
sample.set(key, value);
}
- output->write(sample);
+ output->onSample(sample);
}
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);