aboutsummaryrefslogtreecommitdiff
path: root/apps/SoilMoistureIo.h
diff options
context:
space:
mode:
Diffstat (limited to 'apps/SoilMoistureIo.h')
-rw-r--r--apps/SoilMoistureIo.h29
1 files changed, 26 insertions, 3 deletions
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index bdb0932..386296a 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -10,6 +10,7 @@
#include <boost/optional.hpp>
#include <boost/lexical_cast.hpp>
#include <functional>
+#include <mutex>
// TODO: rename to trygvis::sample
namespace trygvis {
@@ -44,6 +45,7 @@ class KeyDictionary;
class SampleKey;
+// TODO: rename to open_sample_stream_parser
unique_ptr<SampleStreamParser> open_sample_input_stream(
shared_ptr<SampleOutputStream> output,
KeyDictionary &dict,
@@ -96,6 +98,13 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
return open_sample_output_stream(output, dict, type);
}
+class ThreadSafeSampleOutputStream;
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
class sample_exception : public runtime_error {
public:
sample_exception(const string &what) : runtime_error(what) {
@@ -288,6 +297,20 @@ public:
vector<SampleRecord> samples;
};
+class ThreadSafeSampleOutputStream : public SampleOutputStream {
+public:
+ ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+
+ ~ThreadSafeSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying;
+ std::mutex mutex;
+};
+
class CsvSampleOutputStream : public SampleOutputStream {
public:
CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
@@ -368,10 +391,10 @@ protected:
}
};
-class KeyValueSampleParser : public SampleStreamParser {
+class KeyValueSampleStreamParser : public SampleStreamParser {
public:
- KeyValueSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
line(make_shared<vector<uint8_t>>()) {
}
@@ -393,7 +416,7 @@ public:
private:
unique_ptr<SampleStreamParser> parser;
- unique_ptr<KeyValueSampleParser> keyValueParser;
+ unique_ptr<KeyValueSampleStreamParser> keyValueParser;
public:
virtual void process(mutable_buffers_1 buffer);
};