aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/SoilMoistureIo.cpp70
-rw-r--r--apps/SoilMoistureIo.h58
-rw-r--r--apps/sample-convert.cpp24
-rw-r--r--apps/sm-serial-read.cpp70
4 files changed, 151 insertions, 71 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 1280fc6..98a256b 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -2,6 +2,8 @@
#include "json.hpp"
#include <set>
+#include <boost/regex.hpp>
+#include <chrono>
namespace trygvis {
namespace soil_moisture {
@@ -28,7 +30,7 @@ CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector<string> fie
stream << endl;
}
-void CsvSampleOutputStream::write(it values) {
+void CsvSampleOutputStream::write(Sample values) {
auto i = fields.begin();
while (true) {
auto field = *i;
@@ -54,8 +56,8 @@ JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector<string> f
stream(stream), fields(fields) {
}
-void JsonSampleOutputStream::write(it values) {
- json doc;
+void JsonSampleOutputStream::write(Sample values) {
+ json doc({});
for (auto &f: fields) {
auto value = values.find(f);
@@ -72,7 +74,7 @@ SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, vector<string> fie
stream(stream), fields(fields) {
}
-void SqlSampleOutputStream::write(it values) {
+void SqlSampleOutputStream::write(Sample values) {
auto i = fields.begin();
stringstream fs, vs;
@@ -103,5 +105,65 @@ void SqlSampleOutputStream::write(it values) {
stream << "INSERT INTO (" << fs << ") VALUES(" << vs << ")" << endl;
}
+void CsvParser::process(mutable_buffers_1 buffer) {
+
+ size_t some = buffer_size(buffer);
+ auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
+
+ for (int i = 0; i < some; i++) {
+ uint8_t b = data[i];
+
+ if (b == packet_delimiter) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ } else {
+ line->push_back(b);
+ }
+ }
+
+}
+
+void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+ auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
+ auto s = std::string((char *) packet->data(), packet->size());
+// cerr << "packet: " << s << ", size=" << packet->size() << endl;
+
+ static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)");
+
+ std::string::const_iterator start = s.begin();
+ std::string::const_iterator end = s.end();
+ boost::match_results<std::string::const_iterator> what;
+ boost::match_flag_type flags = boost::match_default;
+
+ Sample sample;
+
+ while (regex_search(start, end, what, e, flags)) {
+ auto key = static_cast<string>(what[1]);
+ auto value = static_cast<string>(what[2]);
+ start = what[0].second;
+
+// static const string device_type = "serial";
+ map<string, string> values;
+ values[key] = value;
+// values["hostname"] = hostname;
+// values["device"] = device;
+// values["device_type"] = device_type;
+// values["timestamp"] = to_string(timestamp);
+// values["sensor"] = sensor;
+// values["value"] = value;
+
+// cerr << key << " => " << value << endl;
+
+ sample[key] = value;
+
+ flags |= boost::match_prev_avail;
+ flags |= boost::match_not_bob;
+ }
+
+ if (sample.begin() != sample.end()) {
+ output->write(sample);
+ }
+}
+
}
}
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index 739758a..aff7c85 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -4,24 +4,54 @@
#include <ostream>
#include <vector>
#include <map>
+#include <memory>
+#include <boost/asio/buffer.hpp>
+#include <functional>
namespace trygvis {
namespace soil_moisture {
using namespace std;
+using namespace boost::asio;
-class SampleOutputStream {
+class Sample {
public:
- typedef map<string, string> it;
+ Sample() : entries() {
+ }
+
+ Sample(map<string, string> entries) : entries(entries) {
+ }
+
+ map<string, string>::iterator find(string &s) {
+ return entries.find(s);
+ }
+
+ map<string, string>::iterator begin() {
+ return entries.begin();
+ }
+
+ map<string, string>::iterator end() {
+ return entries.end();
+ }
- virtual void write(it values) = 0;
+ string& operator[](string key) {
+ return entries[key];
+ }
+
+private:
+ map<string, string> entries;
+};
+
+class SampleOutputStream {
+public:
+ virtual void write(Sample sample) = 0;
};
class CsvSampleOutputStream : public SampleOutputStream {
public:
CsvSampleOutputStream(ostream& stream, vector<string> fields);
- void write(it values);
+ void write(Sample values);
private:
ostream& stream;
@@ -32,7 +62,7 @@ class JsonSampleOutputStream : public SampleOutputStream {
public:
JsonSampleOutputStream(ostream& stream, vector<string> fields);
- void write(it values);
+ void write(Sample values);
private:
ostream& stream;
@@ -43,13 +73,29 @@ class SqlSampleOutputStream : public SampleOutputStream {
public:
SqlSampleOutputStream(ostream& stream, vector<string> fields);
- void write(it values);
+ void write(Sample values);
private:
ostream& stream;
vector<string> fields;
};
+class CsvParser {
+
+public:
+ CsvParser(shared_ptr<SampleOutputStream> output) : output(output), line(make_shared<vector<uint8_t>>()) {
+ }
+
+ void process(mutable_buffers_1 buffer);
+
+private:
+ void process_line(shared_ptr<vector<uint8_t>> packet);
+
+ static const uint8_t packet_delimiter = '\n';
+ shared_ptr<SampleOutputStream> output;
+ shared_ptr<vector<uint8_t>> line;
+};
+
}
}
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index 2e448ca..9a0627d 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -1,6 +1,7 @@
#include "SoilMoistureIo.h"
#include "json.hpp"
#include "apps.h"
+#include <istream>
enum class Format {
PLAIN,
@@ -64,7 +65,7 @@ public:
auto desc = execution.desc;
auto vm = execution.vm;
- shared_ptr<SampleOutputStream> sampleStream;
+ shared_ptr<SampleOutputStream> output;
auto field_names = vector<string>({
"hostname",
@@ -75,13 +76,24 @@ public:
"value"
});
- sampleStream = make_shared<JsonSampleOutputStream>(cout, field_names);
+ field_names = vector<string>({
+ "analog",
+ "dry",
+ "water",
+ "last_watering_started",
+ "last_watering_stopped",
+ "now"
+ });
+
+ output = make_shared<JsonSampleOutputStream>(cout, field_names);
- map<string, string> values;
- values["hostname"] = "my-hostname";
- values["extra"] = "wat";
+ auto input = make_shared<CsvParser>(output);
- sampleStream->write(values);
+ char data[100];
+ while (!cin.eof()) {
+ cin.get(data[0]);
+ input->process(boost::asio::buffer(data, 1));
+ }
return EXIT_SUCCESS;
}
diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp
index d605ae4..955d262 100644
--- a/apps/sm-serial-read.cpp
+++ b/apps/sm-serial-read.cpp
@@ -4,7 +4,6 @@
#include <chrono>
#include <thread>
#include <boost/asio/serial_port.hpp>
-#include <boost/regex.hpp>
enum class Format {
PLAIN,
@@ -60,74 +59,32 @@ string hostname = get_hostname();
class port_handler {
public:
- port_handler(string device, serial_port &serial_port, shared_ptr<SampleOutputStream> stream) :
- device(device), port(serial_port), stream(stream) {
+ port_handler(string device, serial_port &serial_port, shared_ptr<CsvParser> input) :
+ device(device), port(serial_port), input(input) {
}
void run() {
auto packet = make_shared<vector<uint8_t>>(1024);
while (port.is_open()) {
- size_t some = port.read_some(buffer);
- for (int i = 0; i < some; i++) {
- uint8_t b = data[i];
-
- if (b == packet_delimiter) {
- on_packet(packet);
- packet = make_shared<vector<uint8_t>>(1024);
- } else {
- packet->emplace_back(b);
- }
- }
+ std::size_t some = port.read_some(buffer);
+
+ mutable_buffers_1 chunk = boost::asio::buffer(data, some);
+ input->process(chunk);
}
cerr << "port closed" << endl;
}
- void on_packet(shared_ptr<vector<uint8_t>> packet) {
- auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
- auto s = std::string((char *) packet->data(), packet->size());
- cerr << "packet: " << s << endl;
-
- static const boost::regex e("([a-zA-Z0-9]+)=([0-9]+)");
-
- std::string::const_iterator start = s.begin();
- std::string::const_iterator end = s.end();
- boost::match_results<std::string::const_iterator> what;
- boost::match_flag_type flags = boost::match_default;
-
- while (regex_search(start, end, what, e, flags)) {
- auto sensor = static_cast<string>(what[1]);
- auto value = static_cast<string>(what[2]);
- start = what[0].second;
-
- static const string device_type = "serial";
- map<string, string> values;
- values["hostname"] = hostname;
- values["device_type"] = device_type;
- values["device"] = device;
- values["timestamp"] = to_string(timestamp);
- values["sensor"] = sensor;
- values["value"] = value;
-
-// cerr << sensor << " => " << value << endl;
-
- stream->write(values);
-
- flags |= boost::match_prev_avail;
- flags |= boost::match_not_bob;
- }
- }
private:
static const size_t size = 1024;
- static const uint8_t packet_delimiter = '\n';
string device;
serial_port &port;
uint8_t data[size];
mutable_buffers_1 buffer = boost::asio::buffer(data, size);
- shared_ptr<SampleOutputStream> stream;
+ shared_ptr<CsvParser> input;
};
class sm_serial_read : public app {
@@ -171,20 +128,23 @@ public:
"sensor",
"value"
});
- shared_ptr <SampleOutputStream> sampleStream;
+
+ shared_ptr<SampleOutputStream> output;
if (format == Format::JSON) {
- sampleStream = make_shared<JsonSampleOutputStream>(cout, field_names);
+ output = make_shared<JsonSampleOutputStream>(cout, field_names);
} else if (format == Format::SQL) {
- sampleStream = make_shared<SqlSampleOutputStream>(cout, field_names);
+ output = make_shared<SqlSampleOutputStream>(cout, field_names);
} else if (format == Format::PLAIN) {
- sampleStream = make_shared<CsvSampleOutputStream>(cout, field_names);
+ output = make_shared<CsvSampleOutputStream>(cout, field_names);
} else {
cerr << "Unsupported format: " << boost::lexical_cast<string>(format) << endl;
return EXIT_FAILURE;
}
- port_handler(port_name, port, sampleStream).run();
+ shared_ptr<CsvParser> input = make_shared<CsvParser>(output);
+
+ port_handler(port_name, port, input).run();
return EXIT_SUCCESS;
}