diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2017-09-01 13:59:25 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2017-09-01 14:06:01 +0200 |
commit | 1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6 (patch) | |
tree | 691f851318c60465dd5d4c837136e991026432ff | |
parent | 3af4d83dfa190d8d95aefbf31ae0b1d85abe492a (diff) | |
download | ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.gz ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.bz2 ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.tar.xz ble-toys-1f2c7aae9fdd39a478944ccda5c9e82d76ab5db6.zip |
o Renaming SampleOutputStream to SampleConsumer.
o Some c++ style fixes.
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | CMakeLists.txt | 21 | ||||
-rw-r--r-- | apps/apps.h | 10 | ||||
-rw-r--r-- | apps/mqtt-publish.cpp | 10 | ||||
-rw-r--r-- | apps/sample-add-timestamp.cpp | 33 | ||||
-rw-r--r-- | apps/sample-convert.cpp | 6 | ||||
-rw-r--r-- | apps/sample-select.cpp | 6 | ||||
-rw-r--r-- | apps/sm-get-value.cpp | 4 | ||||
-rw-r--r-- | apps/sm-serial-read-all.cpp | 2 | ||||
-rw-r--r-- | apps/sm-serial-read.cpp | 2 | ||||
-rw-r--r-- | sensor/include/trygvis/sensor/io.h | 125 | ||||
-rw-r--r-- | sensor/main/io.cpp | 56 |
12 files changed, 131 insertions, 145 deletions
@@ -5,3 +5,4 @@ build *.tmp.* tmp.* *.tmp +cmake-build-* diff --git a/CMakeLists.txt b/CMakeLists.txt index 9da7eb6..8c64151 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,11 +7,18 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG") -IF(NOT CMAKE_BUILD_TYPE) - SET(CMAKE_BUILD_TYPE Debug CACHE STRING - "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel." - FORCE) -ENDIF(NOT CMAKE_BUILD_TYPE) +if (NOT CMAKE_BUILD_TYPE) + SET(CMAKE_BUILD_TYPE Debug CACHE STRING + "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel." + FORCE) +endif (NOT CMAKE_BUILD_TYPE) + +if (CMAKE_C_COMPILER_ID STREQUAL "GNU" OR CMAKE_C_COMPILER_ID STREQUAL "Clang") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -flto") +endif () +if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto") +endif () # Boost set(Boost_USE_STATIC_LIBS OFF) @@ -37,8 +44,8 @@ if (DOXYGEN_FOUND) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} COMMENT "Generating documentation" VERBATIM) - add_custom_target(doc DEPENDS ${doxy_html_index_file} ) -endif(DOXYGEN_FOUND) + add_custom_target(doc DEPENDS ${doxy_html_index_file}) +endif (DOXYGEN_FOUND) add_subdirectory(ble) add_subdirectory(apps) diff --git a/apps/apps.h b/apps/apps.h index 9173ae0..21a25a3 100644 --- a/apps/apps.h +++ b/apps/apps.h @@ -126,12 +126,6 @@ std::string get_hostname(); int real_main(app *app, int argc, const char *argv[]); -template<typename T> -class noop_delete { -public: - void operator()(T *) const { } -}; - static inline void noop_deleter(void *) { } -} -} +} // namespace apps +} // namespace trygvis diff --git a/apps/mqtt-publish.cpp b/apps/mqtt-publish.cpp index 25e8ae9..1498b73 100644 --- a/apps/mqtt-publish.cpp +++ b/apps/mqtt-publish.cpp @@ -22,12 +22,12 @@ using namespace trygvis::sensor; using namespace trygvis::sensor::io; using namespace trygvis::mqtt_support; -class MqttSampleOutputStream : public SampleOutputStream { +class MqttSampleOutputStream : public SampleConsumer { public: MqttSampleOutputStream(const o<string> &client_id, bool clean_session, string host, unsigned int port, string topic_name, unsigned int keep_alive) - : SampleOutputStream(), + : SampleConsumer(), client(host, port, keep_alive, client_id, clean_session), topic_name(topic_name) { client.connect(); @@ -37,15 +37,15 @@ public: client.disconnect(); }; - void write(SampleRecord const &sample) override { + void onSample(SampleRecord const &sample) override { if (sample.empty()) { return; } // make a string of the sample auto buf = make_shared<stringstream>(); - KeyValueSampleOutputStream out(buf, sample.dict); - out.write(sample); + KeyValueWriterSampleConsumer out(buf, sample.dict); + out.onSample(sample); string s = buf->str(); cout << "sample: " << s; diff --git a/apps/sample-add-timestamp.cpp b/apps/sample-add-timestamp.cpp index 8bc4bac..841782e 100644 --- a/apps/sample-add-timestamp.cpp +++ b/apps/sample-add-timestamp.cpp @@ -1,8 +1,6 @@ #include "trygvis/sensor.h" #include "trygvis/sensor/io.h" #include "apps.h" -#include <vector> -#include "apps.h" namespace trygvis { namespace apps { @@ -13,10 +11,10 @@ using namespace trygvis::sensor; using namespace trygvis::sensor::io; namespace po = boost::program_options; -class TimestampAddingSampleOutputStream : public SampleOutputStream { +class TimestampAddingSampleOutputStream : public SampleConsumer { public: - TimestampAddingSampleOutputStream(shared_ptr<SampleOutputStream> output, KeyDictionary &dict, string timestamp_name) - : timestamp_key(dict.indexOf(timestamp_name)) { + TimestampAddingSampleOutputStream(shared_ptr<SampleConsumer> output, KeyDictionary &dict, string timestamp_name) + : output_(output), timestamp_key(dict.indexOf(timestamp_name)) { if (input_time_resolution_ == time_resolution::MILLISECONDS) { factor = 1000; } else { @@ -24,21 +22,21 @@ public: } } - virtual void write(SampleRecord const &sample) override { + virtual void onSample(const SampleRecord &sample) override { time_t now = time(NULL) * factor; SampleRecord updated_sample(sample); updated_sample.set(timestamp_key, std::to_string(now)); - output_->write(updated_sample); + output_->onSample(updated_sample); }; private: const SampleKey *timestamp_key; - time_resolution input_time_resolution_; + const time_resolution input_time_resolution_ = time_resolution::MILLISECONDS; int factor; - shared_ptr<SampleOutputStream> output_; + shared_ptr<SampleConsumer> output_; }; class sample_add_timestamp : public app { @@ -62,25 +60,22 @@ public: const int buffer_size = 1024; int main(app_execution &execution) override { - shared_ptr<istream> input; - - input = shared_ptr<istream>(&cin, noop_deleter); + auto stdin = shared_ptr<istream>(&cin, noop_deleter); + auto stdout = shared_ptr<ostream>(&cout, noop_deleter); KeyDictionary dict; sample_output_stream_options options = {}; - auto unique_output_stream = - open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, output_format, options); - shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)}; - auto p = make_shared<TimestampAddingSampleOutputStream>(output_stream, dict, timestamp_name); + auto writer = open_sample_writer(stdout, dict, output_format, options); + auto p = make_shared<TimestampAddingSampleOutputStream>(std::move(writer), dict, timestamp_name); auto parser = open_sample_stream_parser(p, dict); int recordCount = 0; - while (!input->eof()) { + while (!stdin->eof()) { char buffer[buffer_size]; - input->read(buffer, buffer_size); - size_t gcount = (size_t) input->gcount(); + stdin->read(buffer, buffer_size); + auto gcount = static_cast<size_t>(stdin->gcount()); recordCount++; diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index 81c08ca..2142462 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -93,13 +93,13 @@ public: options.push_back(&fs); } - unique_ptr<SampleOutputStream> o = open_sample_output_stream(outputStream, dict, output_format, options); + unique_ptr<SampleConsumer> o = open_sample_writer(outputStream, dict, output_format, options); if (add_timestamp) { - o = make_unique<AddTimestampSampleOutputStream>(move(o), dict, timestamp_field); + o = make_unique<AddTimestampSampleConsumer>(move(o), dict, timestamp_field); } - shared_ptr<SampleOutputStream> output(move(o)); + shared_ptr<SampleConsumer> output(move(o)); auto input = make_shared<KeyValueSampleStreamParser>(output, dict); diff --git a/apps/sample-select.cpp b/apps/sample-select.cpp index d067af4..1db1443 100644 --- a/apps/sample-select.cpp +++ b/apps/sample-select.cpp @@ -91,10 +91,10 @@ public: } vector<sample_output_stream_option*> options = {}; - unique_ptr<SampleOutputStream> unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, + unique_ptr<SampleConsumer> unique_output_stream = open_sample_writer(shared_ptr<ostream>(&cout, noop_deleter), dict, parser->type(), options); - shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)}; - shared_ptr<SampleOutputStream> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict, + shared_ptr<SampleConsumer> output_stream{std::move(unique_output_stream)}; + shared_ptr<SampleConsumer> p = make_shared<TimestampFixingSampleOutputStream>(output_stream, dict, timestamp_name, relative_name, relative_resolution, start_time); parser = open_sample_stream_parser(p, dict, parser->type()); diff --git a/apps/sm-get-value.cpp b/apps/sm-get-value.cpp index 42f5da3..483df70 100644 --- a/apps/sm-get-value.cpp +++ b/apps/sm-get-value.cpp @@ -109,7 +109,7 @@ public: auto epoch = system_clock::now().time_since_epoch(); auto timestamp = duration_cast<seconds>(epoch).count(); auto unique_output_stream = open_sample_output_stream(shared_ptr<ostream>(&cout, noop_deleter), dict, format); - shared_ptr<SampleOutputStream> output_stream{std::move(unique_output_stream)}; + shared_ptr<SampleConsumer> output_stream{std::move(unique_output_stream)}; auto sample = SampleRecord(dict) .set(hostname_key, get_hostname()) @@ -138,7 +138,7 @@ public: i++; } - output_stream->write(sample); + output_stream->onSample(sample); } void withConnection(sample_format_type format, shared_ptr<BluetoothGatt> gatt) { diff --git a/apps/sm-serial-read-all.cpp b/apps/sm-serial-read-all.cpp index a81a016..3c2b617 100644 --- a/apps/sm-serial-read-all.cpp +++ b/apps/sm-serial-read-all.cpp @@ -115,7 +115,7 @@ public: auto output_stream = shared_ptr<ostream>(&cout, noop_deleter); auto output = open_sample_output_stream(output_stream, outputDict, format); auto tso = thread_safe_sample_output_stream(move(output)); - shared_ptr<SampleOutputStream> thread_safe_output(move(tso)); + shared_ptr<SampleConsumer> thread_safe_output(move(tso)); while (run) { // Port cleanup diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp index 980a3f6..69931a0 100644 --- a/apps/sm-serial-read.cpp +++ b/apps/sm-serial-read.cpp @@ -90,7 +90,7 @@ public: } shared_ptr<ostream> outputStream = shared_ptr<ostream>(&cout, noop_deleter); - shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, format); + shared_ptr<SampleConsumer> output = open_sample_output_stream(outputStream, dict, format); shared_ptr<KeyValueSampleStreamParser> input = make_shared<KeyValueSampleStreamParser>(output, dict); diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h index 304091b..403af9b 100644 --- a/sensor/include/trygvis/sensor/io.h +++ b/sensor/include/trygvis/sensor/io.h @@ -4,6 +4,7 @@ #include <mutex> #include <boost/asio/buffer.hpp> +#include <utility> namespace trygvis { namespace sensor { @@ -13,47 +14,40 @@ using namespace std; using namespace boost::asio; struct sample_output_stream_option { - virtual ~sample_output_stream_option() { - }; + virtual ~sample_output_stream_option() = default;; }; struct output_fields_option : sample_output_stream_option { - ~output_fields_option() { - } + ~output_fields_option() override = default; vector<string> fields; }; struct timestamp_field_option : sample_output_stream_option { - timestamp_field_option(const string name) : name(name) { - } + explicit timestamp_field_option(string name) : name(std::move(name)) {} - ~timestamp_field_option() { - } + ~timestamp_field_option() override = default; const string name; }; class table_name_option : public sample_output_stream_option { public: - table_name_option(const string name) : name(name) { - } + explicit table_name_option(string name) : name(std::move(name)) {} - ~table_name_option() { - } + ~table_name_option() override = default; const string name; }; class sample_output_stream_options : public vector<sample_output_stream_option *> { public: - ~sample_output_stream_options() { - } + ~sample_output_stream_options() = default; template<typename T> o<T *> find_option() const { - for (auto it = begin(); it != end(); ++it) { - T *x = dynamic_cast<T *>(*it); + for (auto it : *this) { + auto *x = dynamic_cast<T *>(it); if (x != nullptr) { return o<T *>(x); @@ -65,90 +59,85 @@ public: }; struct missing_required_option_error : runtime_error { - missing_required_option_error(string what) : runtime_error(what) { - } + explicit missing_required_option_error(const string &what) : runtime_error(what) {} - ~missing_required_option_error() { - } + ~missing_required_option_error() override = default; }; class SampleStreamParser; -class SampleOutputStream; +class SampleConsumer; /** * Throws missing_required_option_error */ unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, + shared_ptr<SampleConsumer> output, KeyDictionary &dict, sample_format_type type = sample_format_type::AUTO); /** * Throws missing_required_option_error */ -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_writer( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options); static inline -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_output_stream( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type) { sample_output_stream_options options; - return open_sample_output_stream(output, dict, type, options); + return open_sample_writer(std::move(output), dict, type, options); } -class SampleOutputStream { +class SampleConsumer { public: - virtual void write(SampleRecord const &sample) = 0; + virtual void onSample(SampleRecord const &sample) = 0; }; -class VectorSampleOutputStream : public SampleOutputStream { +class VectorSampleOutputStream : public SampleConsumer { public: - virtual void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; -public: vector<SampleRecord> samples; }; -class ThreadSafeSampleOutputStream : public SampleOutputStream { +class ThreadSafeSampleConsumer : public SampleConsumer { public: - ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + explicit ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying); - ~ThreadSafeSampleOutputStream() { - } + ~ThreadSafeSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr<SampleOutputStream> underlying; + unique_ptr<SampleConsumer> underlying; std::mutex mutex; }; -class AddTimestampSampleOutputStream : public SampleOutputStream { +class AddTimestampSampleConsumer : public SampleConsumer { public: - AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, KeyDictionary &dict, const string ×tamp_name); + AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string ×tamp_name); - ~AddTimestampSampleOutputStream() { - } + ~AddTimestampSampleConsumer() = default; - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: - unique_ptr<SampleOutputStream> underlying_; + unique_ptr<SampleConsumer> underlying_; const SampleKey* timestamp_key; }; -class CsvSampleOutputStream : public SampleOutputStream { +class CsvWriterSampleConsumer : public SampleConsumer { public: - CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample); + void onSample(SampleRecord const &sample) override; private: void writeHeader(); @@ -158,33 +147,33 @@ private: bool headerWritten; }; -class JsonSampleOutputStream : public SampleOutputStream { +class JsonWriterSampleConsumer : public SampleConsumer { public: - JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr<ostream> stream; }; -class KeyValueSampleOutputStream : public SampleOutputStream { +class KeyValueWriterSampleConsumer : public SampleConsumer { public: - KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); + KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; shared_ptr<ostream> stream; }; -class RrdSampleOutputStream : public SampleOutputStream { +class RrdWriterSampleConsumer : public SampleConsumer { public: - RrdSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields); + RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: vector<const SampleKey *> keys; @@ -192,11 +181,11 @@ private: const SampleKey *timestamp_key; }; -class SqlSampleOutputStream : public SampleOutputStream { +class SqlWriterSampleConsumer : public SampleConsumer { public: - SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); + SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name); - void write(SampleRecord const &sample) override; + void onSample(SampleRecord const &sample) override; private: KeyDictionary &dict; @@ -235,8 +224,8 @@ protected: class KeyValueSampleStreamParser : public SampleStreamParser { public: - KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::CSV, dict), output(output), + KeyValueSampleStreamParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) : + SampleStreamParser(sample_format_type::CSV, dict), output(std::move(output)), line(make_shared<vector<uint8_t>>()) { } @@ -248,17 +237,17 @@ private: void process_line(shared_ptr<vector<uint8_t>> &packet); static const uint8_t packet_delimiter = '\n'; - shared_ptr<SampleOutputStream> output; + shared_ptr<SampleConsumer> output; shared_ptr<vector<uint8_t>> line; }; class AutoSampleParser : public SampleStreamParser { public: - AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict); + AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict); - virtual int process(mutable_buffers_1 &buffer) override; + int process(mutable_buffers_1 &buffer) override; - virtual int finish() override; + int finish() override; private: unique_ptr<SampleStreamParser> parser; @@ -266,10 +255,10 @@ private: }; static inline -unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { - return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +unique_ptr<ThreadSafeSampleConsumer> thread_safe_sample_output_stream(unique_ptr<SampleConsumer> underlying) { + return make_unique<ThreadSafeSampleConsumer>(move(underlying)); }; -} -} -} +} // namespace io +} // namespace sensor +} // namespace trygvis diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp index 9e822c3..a0dcdc9 100644 --- a/sensor/main/io.cpp +++ b/sensor/main/io.cpp @@ -16,7 +16,7 @@ using boost::escaped_list_separator; using json = nlohmann::json; unique_ptr<SampleStreamParser> open_sample_stream_parser( - shared_ptr<SampleOutputStream> output, + shared_ptr<SampleConsumer> output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { @@ -28,18 +28,18 @@ unique_ptr<SampleStreamParser> open_sample_stream_parser( } } -unique_ptr<SampleOutputStream> open_sample_output_stream( +unique_ptr<SampleConsumer> open_sample_writer( shared_ptr<ostream> output, KeyDictionary &dict, sample_format_type type, sample_output_stream_options options) { if (type == sample_format_type::CSV) { - return make_unique<CsvSampleOutputStream>(output, dict); + return make_unique<CsvWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::KEY_VALUE) { - return make_unique<KeyValueSampleOutputStream>(output, dict); + return make_unique<KeyValueWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::JSON) { - return make_unique<JsonSampleOutputStream>(output, dict); + return make_unique<JsonWriterSampleConsumer>(output, dict); } else if (type == sample_format_type::RRD) { auto of = options.find_option<output_fields_option>(); @@ -47,7 +47,7 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp"); - return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of); + return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of); } else if (type == sample_format_type::SQL) { auto tno = options.find_option<table_name_option>(); @@ -55,30 +55,30 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( throw missing_required_option_error("table name"); } - return make_unique<SqlSampleOutputStream>(move(output), dict, tno.value()->name); + return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } -ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) +ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying) : underlying(move(underlying)) { } -void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { +void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) { std::unique_lock<std::mutex> lock(mutex); - underlying->write(sample); + underlying->onSample(sample); } -AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying, +AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying, KeyDictionary &dict, const string ×tamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) { } -void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { +void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) { if (sample.at(timestamp_key)) { - underlying_->write(sample); + underlying_->onSample(sample); return; } @@ -88,10 +88,10 @@ void AddTimestampSampleOutputStream::write(SampleRecord const &sample) { SampleRecord copy = sample; copy.set(timestamp_key, timestamp_s); - underlying_->write(copy); + underlying_->onSample(copy); } -void VectorSampleOutputStream::write(SampleRecord const &sample) { +void VectorSampleOutputStream::onSample(SampleRecord const &sample) { if (sample.empty()) { return; } @@ -99,11 +99,11 @@ void VectorSampleOutputStream::write(SampleRecord const &sample) { samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) +CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } -void CsvSampleOutputStream::write(SampleRecord const &sample) { +void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -151,7 +151,7 @@ void CsvSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -void CsvSampleOutputStream::writeHeader() { +void CsvWriterSampleConsumer::writeHeader() { auto &s = *stream; auto i = dict.begin(); @@ -168,11 +168,11 @@ void CsvSampleOutputStream::writeHeader() { s << endl << flush; } -JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : +JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void JsonSampleOutputStream::write(SampleRecord const &sample) { +void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -205,11 +205,11 @@ void JsonSampleOutputStream::write(SampleRecord const &sample) { *stream.get() << doc << endl << flush; } -KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) : +KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } -void KeyValueSampleOutputStream::write(SampleRecord const &sample) { +void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -253,7 +253,7 @@ void KeyValueSampleOutputStream::write(SampleRecord const &sample) { *s << endl << flush; } -RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, +RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, const SampleKey *timestamp_key, o<output_fields_option *> output_fields) : @@ -270,7 +270,7 @@ RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream, } } -void RrdSampleOutputStream::write(SampleRecord const &sample) { +void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; @@ -309,11 +309,11 @@ void RrdSampleOutputStream::write(SampleRecord const &sample) { s << endl << flush; } -SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : +SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } -void SqlSampleOutputStream::write(SampleRecord const &sample) { +void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) { string fs, vs; fs.reserve(1024); @@ -425,10 +425,10 @@ void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packe sample.set(key, value); } - output->write(sample); + output->onSample(sample); } -AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : +AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); |