diff options
-rw-r--r-- | apps/SoilMoistureIo.cpp | 70 | ||||
-rw-r--r-- | apps/SoilMoistureIo.h | 58 | ||||
-rw-r--r-- | apps/sample-convert.cpp | 24 | ||||
-rw-r--r-- | apps/sm-serial-read.cpp | 70 |
4 files changed, 151 insertions, 71 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp index 1280fc6..98a256b 100644 --- a/apps/SoilMoistureIo.cpp +++ b/apps/SoilMoistureIo.cpp @@ -2,6 +2,8 @@ #include "json.hpp" #include <set> +#include <boost/regex.hpp> +#include <chrono> namespace trygvis { namespace soil_moisture { @@ -28,7 +30,7 @@ CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector<string> fie stream << endl; } -void CsvSampleOutputStream::write(it values) { +void CsvSampleOutputStream::write(Sample values) { auto i = fields.begin(); while (true) { auto field = *i; @@ -54,8 +56,8 @@ JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector<string> f stream(stream), fields(fields) { } -void JsonSampleOutputStream::write(it values) { - json doc; +void JsonSampleOutputStream::write(Sample values) { + json doc({}); for (auto &f: fields) { auto value = values.find(f); @@ -72,7 +74,7 @@ SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, vector<string> fie stream(stream), fields(fields) { } -void SqlSampleOutputStream::write(it values) { +void SqlSampleOutputStream::write(Sample values) { auto i = fields.begin(); stringstream fs, vs; @@ -103,5 +105,65 @@ void SqlSampleOutputStream::write(it values) { stream << "INSERT INTO (" << fs << ") VALUES(" << vs << ")" << endl; } +void CsvParser::process(mutable_buffers_1 buffer) { + + size_t some = buffer_size(buffer); + auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); + + for (int i = 0; i < some; i++) { + uint8_t b = data[i]; + + if (b == packet_delimiter) { + process_line(line); + line = make_shared<vector<uint8_t>>(); + } else { + line->push_back(b); + } + } + +} + +void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) { + auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + auto s = std::string((char *) packet->data(), packet->size()); +// cerr << "packet: " << s << ", size=" << packet->size() << endl; + + static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)"); + + std::string::const_iterator start = s.begin(); + std::string::const_iterator end = s.end(); + boost::match_results<std::string::const_iterator> what; + boost::match_flag_type flags = boost::match_default; + + Sample sample; + + while (regex_search(start, end, what, e, flags)) { + auto key = static_cast<string>(what[1]); + auto value = static_cast<string>(what[2]); + start = what[0].second; + +// static const string device_type = "serial"; + map<string, string> values; + values[key] = value; +// values["hostname"] = hostname; +// values["device"] = device; +// values["device_type"] = device_type; +// values["timestamp"] = to_string(timestamp); +// values["sensor"] = sensor; +// values["value"] = value; + +// cerr << key << " => " << value << endl; + + sample[key] = value; + + flags |= boost::match_prev_avail; + flags |= boost::match_not_bob; + } + + if (sample.begin() != sample.end()) { + output->write(sample); + } +} + } } diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h index 739758a..aff7c85 100644 --- a/apps/SoilMoistureIo.h +++ b/apps/SoilMoistureIo.h @@ -4,24 +4,54 @@ #include <ostream> #include <vector> #include <map> +#include <memory> +#include <boost/asio/buffer.hpp> +#include <functional> namespace trygvis { namespace soil_moisture { using namespace std; +using namespace boost::asio; -class SampleOutputStream { +class Sample { public: - typedef map<string, string> it; + Sample() : entries() { + } + + Sample(map<string, string> entries) : entries(entries) { + } + + map<string, string>::iterator find(string &s) { + return entries.find(s); + } + + map<string, string>::iterator begin() { + return entries.begin(); + } + + map<string, string>::iterator end() { + return entries.end(); + } - virtual void write(it values) = 0; + string& operator[](string key) { + return entries[key]; + } + +private: + map<string, string> entries; +}; + +class SampleOutputStream { +public: + virtual void write(Sample sample) = 0; }; class CsvSampleOutputStream : public SampleOutputStream { public: CsvSampleOutputStream(ostream& stream, vector<string> fields); - void write(it values); + void write(Sample values); private: ostream& stream; @@ -32,7 +62,7 @@ class JsonSampleOutputStream : public SampleOutputStream { public: JsonSampleOutputStream(ostream& stream, vector<string> fields); - void write(it values); + void write(Sample values); private: ostream& stream; @@ -43,13 +73,29 @@ class SqlSampleOutputStream : public SampleOutputStream { public: SqlSampleOutputStream(ostream& stream, vector<string> fields); - void write(it values); + void write(Sample values); private: ostream& stream; vector<string> fields; }; +class CsvParser { + +public: + CsvParser(shared_ptr<SampleOutputStream> output) : output(output), line(make_shared<vector<uint8_t>>()) { + } + + void process(mutable_buffers_1 buffer); + +private: + void process_line(shared_ptr<vector<uint8_t>> packet); + + static const uint8_t packet_delimiter = '\n'; + shared_ptr<SampleOutputStream> output; + shared_ptr<vector<uint8_t>> line; +}; + } } diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index 2e448ca..9a0627d 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -1,6 +1,7 @@ #include "SoilMoistureIo.h" #include "json.hpp" #include "apps.h" +#include <istream> enum class Format { PLAIN, @@ -64,7 +65,7 @@ public: auto desc = execution.desc; auto vm = execution.vm; - shared_ptr<SampleOutputStream> sampleStream; + shared_ptr<SampleOutputStream> output; auto field_names = vector<string>({ "hostname", @@ -75,13 +76,24 @@ public: "value" }); - sampleStream = make_shared<JsonSampleOutputStream>(cout, field_names); + field_names = vector<string>({ + "analog", + "dry", + "water", + "last_watering_started", + "last_watering_stopped", + "now" + }); + + output = make_shared<JsonSampleOutputStream>(cout, field_names); - map<string, string> values; - values["hostname"] = "my-hostname"; - values["extra"] = "wat"; + auto input = make_shared<CsvParser>(output); - sampleStream->write(values); + char data[100]; + while (!cin.eof()) { + cin.get(data[0]); + input->process(boost::asio::buffer(data, 1)); + } return EXIT_SUCCESS; } diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp index d605ae4..955d262 100644 --- a/apps/sm-serial-read.cpp +++ b/apps/sm-serial-read.cpp @@ -4,7 +4,6 @@ #include <chrono> #include <thread> #include <boost/asio/serial_port.hpp> -#include <boost/regex.hpp> enum class Format { PLAIN, @@ -60,74 +59,32 @@ string hostname = get_hostname(); class port_handler { public: - port_handler(string device, serial_port &serial_port, shared_ptr<SampleOutputStream> stream) : - device(device), port(serial_port), stream(stream) { + port_handler(string device, serial_port &serial_port, shared_ptr<CsvParser> input) : + device(device), port(serial_port), input(input) { } void run() { auto packet = make_shared<vector<uint8_t>>(1024); while (port.is_open()) { - size_t some = port.read_some(buffer); - for (int i = 0; i < some; i++) { - uint8_t b = data[i]; - - if (b == packet_delimiter) { - on_packet(packet); - packet = make_shared<vector<uint8_t>>(1024); - } else { - packet->emplace_back(b); - } - } + std::size_t some = port.read_some(buffer); + + mutable_buffers_1 chunk = boost::asio::buffer(data, some); + input->process(chunk); } cerr << "port closed" << endl; } - void on_packet(shared_ptr<vector<uint8_t>> packet) { - auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - auto s = std::string((char *) packet->data(), packet->size()); - cerr << "packet: " << s << endl; - - static const boost::regex e("([a-zA-Z0-9]+)=([0-9]+)"); - - std::string::const_iterator start = s.begin(); - std::string::const_iterator end = s.end(); - boost::match_results<std::string::const_iterator> what; - boost::match_flag_type flags = boost::match_default; - - while (regex_search(start, end, what, e, flags)) { - auto sensor = static_cast<string>(what[1]); - auto value = static_cast<string>(what[2]); - start = what[0].second; - - static const string device_type = "serial"; - map<string, string> values; - values["hostname"] = hostname; - values["device_type"] = device_type; - values["device"] = device; - values["timestamp"] = to_string(timestamp); - values["sensor"] = sensor; - values["value"] = value; - -// cerr << sensor << " => " << value << endl; - - stream->write(values); - - flags |= boost::match_prev_avail; - flags |= boost::match_not_bob; - } - } private: static const size_t size = 1024; - static const uint8_t packet_delimiter = '\n'; string device; serial_port &port; uint8_t data[size]; mutable_buffers_1 buffer = boost::asio::buffer(data, size); - shared_ptr<SampleOutputStream> stream; + shared_ptr<CsvParser> input; }; class sm_serial_read : public app { @@ -171,20 +128,23 @@ public: "sensor", "value" }); - shared_ptr <SampleOutputStream> sampleStream; + + shared_ptr<SampleOutputStream> output; if (format == Format::JSON) { - sampleStream = make_shared<JsonSampleOutputStream>(cout, field_names); + output = make_shared<JsonSampleOutputStream>(cout, field_names); } else if (format == Format::SQL) { - sampleStream = make_shared<SqlSampleOutputStream>(cout, field_names); + output = make_shared<SqlSampleOutputStream>(cout, field_names); } else if (format == Format::PLAIN) { - sampleStream = make_shared<CsvSampleOutputStream>(cout, field_names); + output = make_shared<CsvSampleOutputStream>(cout, field_names); } else { cerr << "Unsupported format: " << boost::lexical_cast<string>(format) << endl; return EXIT_FAILURE; } - port_handler(port_name, port, sampleStream).run(); + shared_ptr<CsvParser> input = make_shared<CsvParser>(output); + + port_handler(port_name, port, input).run(); return EXIT_SUCCESS; } |