From 0dc2cc6503386c809266ad6564ba675803cf8cc7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 22 Mar 2015 14:27:46 +0100 Subject: o First version of a tool to continuously read and discover serial ports. --- apps/CMakeLists.txt | 1 + apps/SoilMoistureIo.cpp | 19 +++-- apps/SoilMoistureIo.h | 29 +++++++- apps/sample-convert.cpp | 4 +- apps/sm-serial-read-all.cpp | 168 ++++++++++++++++++++++++++++++++++++++++++++ apps/sm-serial-read.cpp | 10 +-- 6 files changed, 217 insertions(+), 14 deletions(-) create mode 100644 apps/sm-serial-read-all.cpp (limited to 'apps') diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index 99bef28..3891d79 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -7,6 +7,7 @@ list(APPEND APPS sm-db-insert) list(APPEND APPS sm-db-select) list(APPEND APPS sm-get-value) list(APPEND APPS sm-serial-read) +list(APPEND APPS sm-serial-read-all) add_library(trygvis-apps SoilMoisture.cpp diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp index 4d2e03d..1d9281b 100644 --- a/apps/SoilMoistureIo.cpp +++ b/apps/SoilMoistureIo.cpp @@ -278,7 +278,7 @@ void SqlSampleOutputStream::write(SampleRecord const &values) { // (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } -void KeyValueSampleParser::process(mutable_buffers_1 buffer) { +void KeyValueSampleStreamParser::process(mutable_buffers_1 buffer) { size_t size = buffer_size(buffer); @@ -303,7 +303,7 @@ void KeyValueSampleParser::process(mutable_buffers_1 buffer) { } -void KeyValueSampleParser::process_line(shared_ptr> packet) { +void KeyValueSampleStreamParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); @@ -332,7 +332,7 @@ void KeyValueSampleParser::process_line(shared_ptr> packet) { } AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : - SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleParser(output, dict)) { + SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleStreamParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); type_ = sample_format_type::KEY_VALUE; @@ -393,7 +393,7 @@ unique_ptr open_sample_input_stream( KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { - return make_unique(output, dict); + return make_unique(output, dict); } else if (type == sample_format_type::AUTO) { return make_unique(output, dict); } else { @@ -441,5 +441,16 @@ unique_ptr open_sample_output_stream( } } +//template +ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr underlying) : underlying(move(underlying)) { +} + +//template +void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) { + std::unique_lock lock(mutex); + + underlying->write(sample); +} + } } diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h index bdb0932..386296a 100644 --- a/apps/SoilMoistureIo.h +++ b/apps/SoilMoistureIo.h @@ -10,6 +10,7 @@ #include #include #include +#include // TODO: rename to trygvis::sample namespace trygvis { @@ -44,6 +45,7 @@ class KeyDictionary; class SampleKey; +// TODO: rename to open_sample_stream_parser unique_ptr open_sample_input_stream( shared_ptr output, KeyDictionary &dict, @@ -96,6 +98,13 @@ unique_ptr open_sample_output_stream( return open_sample_output_stream(output, dict, type); } +class ThreadSafeSampleOutputStream; + +static inline +unique_ptr thread_safe_sample_output_stream(unique_ptr underlying) { + return make_unique(move(underlying)); +}; + class sample_exception : public runtime_error { public: sample_exception(const string &what) : runtime_error(what) { @@ -288,6 +297,20 @@ public: vector samples; }; +class ThreadSafeSampleOutputStream : public SampleOutputStream { +public: + ThreadSafeSampleOutputStream(unique_ptr underlying); + + ~ThreadSafeSampleOutputStream() { + } + + void write(SampleRecord const &sample) override; + +private: + unique_ptr underlying; + std::mutex mutex; +}; + class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict); @@ -368,10 +391,10 @@ protected: } }; -class KeyValueSampleParser : public SampleStreamParser { +class KeyValueSampleStreamParser : public SampleStreamParser { public: - KeyValueSampleParser(shared_ptr output, KeyDictionary &dict) : + KeyValueSampleStreamParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::CSV), output(output), dict(dict), line(make_shared>()) { } @@ -393,7 +416,7 @@ public: private: unique_ptr parser; - unique_ptr keyValueParser; + unique_ptr keyValueParser; public: virtual void process(mutable_buffers_1 buffer); }; diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index d1c53fb..9e2ad55 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -79,9 +79,9 @@ public: options.push_back(&fs); } - shared_ptr output = open_sample_output_stream(outputStream, dict, output_format, options); + shared_ptr output = open_sample_output_stream(outputStream, dict, output_format, options); - auto input = make_shared(output, dict); + auto input = make_shared(output, dict); char data[100]; while (!inputStream->eof()) { diff --git a/apps/sm-serial-read-all.cpp b/apps/sm-serial-read-all.cpp new file mode 100644 index 0000000..8050c85 --- /dev/null +++ b/apps/sm-serial-read-all.cpp @@ -0,0 +1,168 @@ +#include "SoilMoistureIo.h" +#include "json.hpp" +#include "apps.h" +#include +#include + +namespace trygvis { +namespace apps { + +using namespace boost::asio; +using namespace std; +using namespace std::chrono; +using namespace trygvis::apps; +using namespace trygvis::soil_moisture; +namespace po = boost::program_options; +using json = nlohmann::json; + +sample_format_type format; +string hostname = get_hostname(); + +class port_handler { +public: + port_handler(string &port_name, serial_port &port, unique_ptr parser) : + port_name(port_name), port(port), parser(move(parser)) { + } + + void run() { + auto packet = make_shared>(1024); + + while (port.is_open()) { + std::size_t some = port.read_some(buffer); + + mutable_buffers_1 chunk = boost::asio::buffer(data, some); + parser->process(chunk); + } + + cerr << "port closed" << endl; + dead_ = true; + } + + bool dead() const { + return dead_; + } + + const string port_name; + +private: + static const size_t size = 1024; + bool dead_ = false; + serial_port &port; + uint8_t data[size]; + mutable_buffers_1 buffer = boost::asio::buffer(data, size); + + unique_ptr parser; +}; + +// This only supports Linux +#if 1 + +#include +#include + +vector &&find_ports() { + DIR *dir = opendir("/dev"); + vector ports; + + if (!dir) { + return move(ports); + } + + struct dirent *entry; + while ((entry = readdir(dir)) != NULL) { + string name = entry->d_name; + + if (name.find("ttyS") || name.find("ttyUSB") || name.find("ttyACM")) { + ports.emplace_back("/dev/" + name); + } + } + + closedir(dir); + + return move(ports); +} + +#endif + +class sm_serial_read_all : public app { + +public: + bool run = true; + + void add_options(po::options_description_easy_init &options) override { + options + ("help", "produce help message") + ("port", po::value()->required(), "The serial port to read") + ("format", po::value(&format)->default_value(sample_format_type::KEY_VALUE), "Output format"); + } + + static + void handler_thread(port_handler *handler) { + handler->run(); + } + + int main(app_execution &execution) override { + auto desc = execution.desc; + auto vm = execution.vm; + + io_service io_service; + uint32_t baud_rate = 115200; + + map active_ports; + + KeyDictionary outputDict; + auto output_stream = shared_ptr(&cout, noop_deleter); + auto output = open_sample_output_stream(output_stream, outputDict, format); + auto tso = thread_safe_sample_output_stream(move(output)); + shared_ptr thread_safe_output(move(tso)); + + while (run) { + // Port cleanup + for (auto it = active_ports.begin(); it != active_ports.end(); ++it) { + if (it->second->dead()) { + cerr << "Removing dead port " << it->second->port_name << endl; + it = active_ports.erase(it); + } + } + + // Discover new ports + auto ports = find_ports(); + + for (auto port_name : ports) { + if (active_ports.find(port_name) != active_ports.end()) { + cerr << "New port " << port_name; + + serial_port port(io_service); + port.open(port_name); + port.set_option(serial_port_base::baud_rate(baud_rate)); + port.set_option(serial_port_base::character_size(8)); + port.set_option(serial_port_base::parity(serial_port_base::parity::none)); + port.set_option(serial_port_base::stop_bits(serial_port_base::stop_bits::one)); + port.set_option(serial_port_base::flow_control(serial_port_base::flow_control::none)); + + KeyDictionary parserDict; // TODO: dette feiler + unique_ptr parser = open_sample_input_stream(thread_safe_output, parserDict, sample_format_type::KEY_VALUE); + + auto handler = new port_handler(port_name, port, move(parser)); + active_ports[port_name] = handler; + std::thread thread(handler_thread, handler); + } + } + + sleep(1); + } + + return EXIT_SUCCESS; + } + +}; + +} +} + +using namespace trygvis::apps; + +int main(int argc, char *argv[]) { + sm_serial_read_all app; + return launch_app(argc, argv, app); +} diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp index aa997e5..4edd940 100644 --- a/apps/sm-serial-read.cpp +++ b/apps/sm-serial-read.cpp @@ -21,8 +21,8 @@ string hostname = get_hostname(); class port_handler { public: - port_handler(string device, serial_port &serial_port, shared_ptr input) : - device(device), port(serial_port), input(input) { + port_handler(string port_name, serial_port &serial_port, shared_ptr input) : + port_name(port_name), port(serial_port), input(input) { } void run() { @@ -41,12 +41,12 @@ public: private: static const size_t size = 1024; - string device; + string port_name; serial_port &port; uint8_t data[size]; mutable_buffers_1 buffer = boost::asio::buffer(data, size); - shared_ptr input; + shared_ptr input; }; class sm_serial_read : public app { @@ -87,7 +87,7 @@ public: shared_ptr outputStream = shared_ptr(&cout, noop_deleter); shared_ptr output = open_sample_output_stream(outputStream, dict, format); - shared_ptr input = make_shared(output, dict); + shared_ptr input = make_shared(output, dict); port_handler(port_name, port, input).run(); -- cgit v1.2.3