aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/CMakeLists.txt1
-rw-r--r--apps/SoilMoistureIo.cpp19
-rw-r--r--apps/SoilMoistureIo.h29
-rw-r--r--apps/sample-convert.cpp4
-rw-r--r--apps/sm-serial-read-all.cpp168
-rw-r--r--apps/sm-serial-read.cpp10
6 files changed, 217 insertions, 14 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index 99bef28..3891d79 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -7,6 +7,7 @@ list(APPEND APPS sm-db-insert)
list(APPEND APPS sm-db-select)
list(APPEND APPS sm-get-value)
list(APPEND APPS sm-serial-read)
+list(APPEND APPS sm-serial-read-all)
add_library(trygvis-apps
SoilMoisture.cpp
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 4d2e03d..1d9281b 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -278,7 +278,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) {
// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
}
-void KeyValueSampleParser::process(mutable_buffers_1 buffer) {
+void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) {
size_t size = buffer_size(buffer);
@@ -303,7 +303,7 @@ void KeyValueSampleParser::process(mutable_buffers_1 buffer) {
}
-void KeyValueSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
auto s = std::string((char *) packet->data(), packet->size());
@@ -332,7 +332,7 @@ void KeyValueSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
}
AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
- SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleParser(output, dict)) {
+ SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
// Directly select the parser now until we have more than one parser
parser = std::move(keyValueParser);
type_ = sample_format_type::KEY_VALUE;
@@ -393,7 +393,7 @@ unique_ptr<SampleStreamParser> open_sample_input_stream(
KeyDictionary &dict,
sample_format_type type) {
if (type == sample_format_type::KEY_VALUE) {
- return make_unique<KeyValueSampleParser>(output, dict);
+ return make_unique<KeyValueSampleStreamParser>(output, dict);
} else if (type == sample_format_type::AUTO) {
return make_unique<AutoSampleParser>(output, dict);
} else {
@@ -441,5 +441,16 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
}
}
+//template<typename T>
+ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying) : underlying(move(underlying)) {
+}
+
+//template<typename T>
+void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ underlying->write(sample);
+}
+
}
}
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index bdb0932..386296a 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -10,6 +10,7 @@
#include <boost/optional.hpp>
#include <boost/lexical_cast.hpp>
#include <functional>
+#include <mutex>
// TODO: rename to trygvis::sample
namespace trygvis {
@@ -44,6 +45,7 @@ class KeyDictionary;
class SampleKey;
+// TODO: rename to open_sample_stream_parser
unique_ptr<SampleStreamParser> open_sample_input_stream(
shared_ptr<SampleOutputStream> output,
KeyDictionary &dict,
@@ -96,6 +98,13 @@ unique_ptr<SampleOutputStream> open_sample_output_stream(
return open_sample_output_stream(output, dict, type);
}
+class ThreadSafeSampleOutputStream;
+
+static inline
+unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
+ return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
+};
+
class sample_exception : public runtime_error {
public:
sample_exception(const string &what) : runtime_error(what) {
@@ -288,6 +297,20 @@ public:
vector<SampleRecord> samples;
};
+class ThreadSafeSampleOutputStream : public SampleOutputStream {
+public:
+ ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying);
+
+ ~ThreadSafeSampleOutputStream() {
+ }
+
+ void write(SampleRecord const &sample) override;
+
+private:
+ unique_ptr<SampleOutputStream> underlying;
+ std::mutex mutex;
+};
+
class CsvSampleOutputStream : public SampleOutputStream {
public:
CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict);
@@ -368,10 +391,10 @@ protected:
}
};
-class KeyValueSampleParser : public SampleStreamParser {
+class KeyValueSampleStreamParser : public SampleStreamParser {
public:
- KeyValueSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
+ KeyValueSampleStreamParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
SampleStreamParser(sample_format_type::CSV), output(output), dict(dict),
line(make_shared<vector<uint8_t>>()) {
}
@@ -393,7 +416,7 @@ public:
private:
unique_ptr<SampleStreamParser> parser;
- unique_ptr<KeyValueSampleParser> keyValueParser;
+ unique_ptr<KeyValueSampleStreamParser> keyValueParser;
public:
virtual void process(mutable_buffers_1 buffer);
};
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index d1c53fb..9e2ad55 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -79,9 +79,9 @@ public:
options.push_back(&fs);
}
- shared_ptr <SampleOutputStream> output = open_sample_output_stream(outputStream, dict, output_format, options);
+ shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, output_format, options);
- auto input = make_shared<KeyValueSampleParser>(output, dict);
+ auto input = make_shared<KeyValueSampleStreamParser>(output, dict);
char data[100];
while (!inputStream->eof()) {
diff --git a/apps/sm-serial-read-all.cpp b/apps/sm-serial-read-all.cpp
new file mode 100644
index 0000000..8050c85
--- /dev/null
+++ b/apps/sm-serial-read-all.cpp
@@ -0,0 +1,168 @@
+#include "SoilMoistureIo.h"
+#include "json.hpp"
+#include "apps.h"
+#include <thread>
+#include <boost/asio/serial_port.hpp>
+
+namespace trygvis {
+namespace apps {
+
+using namespace boost::asio;
+using namespace std;
+using namespace std::chrono;
+using namespace trygvis::apps;
+using namespace trygvis::soil_moisture;
+namespace po = boost::program_options;
+using json = nlohmann::json;
+
+sample_format_type format;
+string hostname = get_hostname();
+
+class port_handler {
+public:
+ port_handler(string &port_name, serial_port &port, unique_ptr<SampleStreamParser> parser) :
+ port_name(port_name), port(port), parser(move(parser)) {
+ }
+
+ void run() {
+ auto packet = make_shared<vector<uint8_t>>(1024);
+
+ while (port.is_open()) {
+ std::size_t some = port.read_some(buffer);
+
+ mutable_buffers_1 chunk = boost::asio::buffer(data, some);
+ parser->process(chunk);
+ }
+
+ cerr << "port closed" << endl;
+ dead_ = true;
+ }
+
+ bool dead() const {
+ return dead_;
+ }
+
+ const string port_name;
+
+private:
+ static const size_t size = 1024;
+ bool dead_ = false;
+ serial_port &port;
+ uint8_t data[size];
+ mutable_buffers_1 buffer = boost::asio::buffer(data, size);
+
+ unique_ptr<SampleStreamParser> parser;
+};
+
+// This only supports Linux
+#if 1
+
+#include <sys/types.h>
+#include <dirent.h>
+
+vector<string> &&find_ports() {
+ DIR *dir = opendir("/dev");
+ vector <string> ports;
+
+ if (!dir) {
+ return move(ports);
+ }
+
+ struct dirent *entry;
+ while ((entry = readdir(dir)) != NULL) {
+ string name = entry->d_name;
+
+ if (name.find("ttyS") || name.find("ttyUSB") || name.find("ttyACM")) {
+ ports.emplace_back("/dev/" + name);
+ }
+ }
+
+ closedir(dir);
+
+ return move(ports);
+}
+
+#endif
+
+class sm_serial_read_all : public app {
+
+public:
+ bool run = true;
+
+ void add_options(po::options_description_easy_init &options) override {
+ options
+ ("help", "produce help message")
+ ("port", po::value<string>()->required(), "The serial port to read")
+ ("format", po::value<sample_format_type>(&format)->default_value(sample_format_type::KEY_VALUE), "Output format");
+ }
+
+ static
+ void handler_thread(port_handler *handler) {
+ handler->run();
+ }
+
+ int main(app_execution &execution) override {
+ auto desc = execution.desc;
+ auto vm = execution.vm;
+
+ io_service io_service;
+ uint32_t baud_rate = 115200;
+
+ map<string, port_handler *> active_ports;
+
+ KeyDictionary outputDict;
+ auto output_stream = shared_ptr<ostream>(&cout, noop_deleter);
+ auto output = open_sample_output_stream(output_stream, outputDict, format);
+ auto tso = thread_safe_sample_output_stream(move(output));
+ shared_ptr<SampleOutputStream> thread_safe_output(move(tso));
+
+ while (run) {
+ // Port cleanup
+ for (auto it = active_ports.begin(); it != active_ports.end(); ++it) {
+ if (it->second->dead()) {
+ cerr << "Removing dead port " << it->second->port_name << endl;
+ it = active_ports.erase(it);
+ }
+ }
+
+ // Discover new ports
+ auto ports = find_ports();
+
+ for (auto port_name : ports) {
+ if (active_ports.find(port_name) != active_ports.end()) {
+ cerr << "New port " << port_name;
+
+ serial_port port(io_service);
+ port.open(port_name);
+ port.set_option(serial_port_base::baud_rate(baud_rate));
+ port.set_option(serial_port_base::character_size(8));
+ port.set_option(serial_port_base::parity(serial_port_base::parity::none));
+ port.set_option(serial_port_base::stop_bits(serial_port_base::stop_bits::one));
+ port.set_option(serial_port_base::flow_control(serial_port_base::flow_control::none));
+
+ KeyDictionary parserDict; // TODO: dette feiler
+ unique_ptr<SampleStreamParser> parser = open_sample_input_stream(thread_safe_output, parserDict, sample_format_type::KEY_VALUE);
+
+ auto handler = new port_handler(port_name, port, move(parser));
+ active_ports[port_name] = handler;
+ std::thread thread(handler_thread, handler);
+ }
+ }
+
+ sleep(1);
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+};
+
+}
+}
+
+using namespace trygvis::apps;
+
+int main(int argc, char *argv[]) {
+ sm_serial_read_all app;
+ return launch_app(argc, argv, app);
+}
diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp
index aa997e5..4edd940 100644
--- a/apps/sm-serial-read.cpp
+++ b/apps/sm-serial-read.cpp
@@ -21,8 +21,8 @@ string hostname = get_hostname();
class port_handler {
public:
- port_handler(string device, serial_port &serial_port, shared_ptr<KeyValueSampleParser> input) :
- device(device), port(serial_port), input(input) {
+ port_handler(string port_name, serial_port &serial_port, shared_ptr<KeyValueSampleStreamParser> input) :
+ port_name(port_name), port(serial_port), input(input) {
}
void run() {
@@ -41,12 +41,12 @@ public:
private:
static const size_t size = 1024;
- string device;
+ string port_name;
serial_port &port;
uint8_t data[size];
mutable_buffers_1 buffer = boost::asio::buffer(data, size);
- shared_ptr<KeyValueSampleParser> input;
+ shared_ptr<KeyValueSampleStreamParser> input;
};
class sm_serial_read : public app {
@@ -87,7 +87,7 @@ public:
shared_ptr<ostream> outputStream = shared_ptr<ostream>(&cout, noop_deleter);
shared_ptr<SampleOutputStream> output = open_sample_output_stream(outputStream, dict, format);
- shared_ptr<KeyValueSampleParser> input = make_shared<KeyValueSampleParser>(output, dict);
+ shared_ptr<KeyValueSampleStreamParser> input = make_shared<KeyValueSampleStreamParser>(output, dict);
port_handler(port_name, port, input).run();