diff options
Diffstat (limited to 'apps/SoilMoistureIo.h')
-rw-r--r-- | apps/SoilMoistureIo.h | 29 |
1 files changed, 26 insertions, 3 deletions
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h index bdb0932..386296a 100644 --- a/apps/SoilMoistureIo.h +++ b/apps/SoilMoistureIo.h @@ -10,6 +10,7 @@ #include <boost/optional.hpp> #include <boost/lexical_cast.hpp> #include <functional> +#include <mutex> // TODO: rename to trygvis::sample namespace trygvis { @@ -44,6 +45,7 @@ class KeyDictionary; class SampleKey; +// TODO: rename to open_sample_stream_parser unique_ptr<SampleStreamParser> open_sample_input_stream( shared_ptr<SampleOutputStream> output, KeyDictionary &dict, @@ -96,6 +98,13 @@ unique_ptr<SampleOutputStream> open_sample_output_stream( return open_sample_output_stream(output, dict, type); } +class ThreadSafeSampleOutputStream; + +static inline +unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) { + return make_unique<ThreadSafeSampleOutputStream>(move(underlying)); +}; + class sample_exception : public runtime_error { public: sample_exception(const string &what) : runtime_error(what) { @@ -288,6 +297,20 @@ public: vector<SampleRecord> samples; }; +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr<SampleOutputStream> underlying; + std::mutex mutex; +}; + class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict); @@ -368,10 +391,10 @@ protected: } }; -class KeyValueSampleParser : public SampleStreamParser { +class KeyValueSampleStreamParser : public SampleStreamParser { public: - KeyValueSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : + KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), line(make_shared<vector<uint8_t>>()) { } @@ -393,7 +416,7 @@ public: private: unique_ptr<SampleStreamParser> parser; - unique_ptr<KeyValueSampleParser> keyValueParser; + unique_ptr<KeyValueSampleStreamParser> keyValueParser; public: virtual void process(mutable_buffers_1 buffer); }; |