aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/CMakeLists.txt13
-rw-r--r--apps/SoilMoistureIo.cpp122
-rw-r--r--apps/SoilMoistureIo.h113
-rw-r--r--apps/apps.h3
-rw-r--r--apps/sample-convert.cpp20
-rw-r--r--apps/sample-timestamp.cpp156
-rw-r--r--apps/sm-serial-read.cpp15
7 files changed, 371 insertions, 71 deletions
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index e2082e1..5510cb3 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -1,6 +1,7 @@
list(APPEND APPS log4cplus-test)
list(APPEND APPS ble-inspect-device)
list(APPEND APPS sample-convert)
+list(APPEND APPS sample-timestamp)
list(APPEND APPS sm-db-insert)
list(APPEND APPS sm-db-select)
list(APPEND APPS sm-get-value)
@@ -32,19 +33,9 @@ if(LOG4CPLUS_LIBRARIES MATCHES NOTFOUND)
message(FATAL_ERROR "Could not find log4cplus library files")
endif()
-include(ExternalProject)
-ExternalProject_Add(
- JSON
- PREFIX json
- GIT_REPOSITORY https://github.com/nlohmann/json.git
- GIT_TAG ec42245951fceb7594bfb24746c7449986c3c2a4
- CONFIGURE_COMMAND ""
- BUILD_COMMAND ""
- INSTALL_COMMAND "")
-
foreach(app ${APPS})
include_directories("${PROJECT_SOURCE_DIR}/include")
- include_directories("${CMAKE_BINARY_DIR}/apps/json/src/JSON/src")
+ include_directories("${PROJECT_SOURCE_DIR}/json/src")
add_executable(${app} ${app}.cpp)
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index fe485b3..315f48d 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -11,12 +11,16 @@ namespace soil_moisture {
using namespace std;
using json = nlohmann::json;
-CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream) :
- stream(stream), filterFields(false), headerWritten(false) {
+void VectorSampleOutputStream::write(Sample sample) {
+ samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector<string> fields) :
- stream(stream), fields(fields), filterFields(true), headerWritten(false) {
+CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream) :
+ stream(move(stream)), filterFields(false), headerWritten(false) {
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
+ stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) {
}
void CsvSampleOutputStream::write(Sample values) {
@@ -25,31 +29,33 @@ void CsvSampleOutputStream::write(Sample values) {
headerWritten = true;
}
+ auto &s = *stream.get();
+
if (filterFields) {
auto i = fields.begin();
while (i != fields.end()) {
if (i != fields.begin()) {
- stream << ",";
+ s << ",";
}
auto field = *i++;
auto value = values.find(field);
if (value != values.end()) {
- stream << value->second;
+ s << value->second;
}
}
} else {
for (auto i = values.begin(); i != values.end();) {
- stream << "\"" << (*i).second << "\"";
+ s << "\"" << (*i).second << "\"";
if (++i != values.end()) {
- stream << ",";
+ s << ",";
}
}
}
- stream << endl;
+ s << endl;
}
void CsvSampleOutputStream::writeHeader() {
@@ -57,26 +63,28 @@ void CsvSampleOutputStream::writeHeader() {
return;
}
+ auto &s = *stream.get();
+
auto i = fields.begin();
while (i != fields.end()) {
- stream << *i;
+ s << *i;
i++;
if (i != fields.end()) {
- stream << ",";
+ s << ",";
}
}
- stream << endl;
+ s << endl;
}
-JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream) :
- stream(stream), fields(), filterFields(false) {
+JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream) :
+ stream(move(stream)), fields(), filterFields(false) {
}
-JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector<string> fields) :
- stream(stream), fields(fields), filterFields(true) {
+JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
+ stream(move(stream)), fields(fields), filterFields(true) {
}
void JsonSampleOutputStream::write(Sample values) {
@@ -96,15 +104,15 @@ void JsonSampleOutputStream::write(Sample values) {
}
}
- stream << doc << endl;
+ *stream.get() << doc << endl;
}
-SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name) :
- stream(stream), table_name(table_name), filter_fields(false) {
+SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name) :
+ stream(move(stream)), table_name(table_name), filter_fields(false) {
}
-SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name, vector<string> fields) :
- stream(stream), table_name(table_name), fields(fields), filter_fields(true) {
+SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields) :
+ stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) {
}
void SqlSampleOutputStream::write(Sample values) {
@@ -126,7 +134,7 @@ void SqlSampleOutputStream::write(Sample values) {
if (value != values.end()) {
vs += "'" + value->second + "'";
} else {
- vs + "NULL";
+ vs += "NULL";
}
i++;
@@ -151,10 +159,10 @@ void SqlSampleOutputStream::write(Sample values) {
}
}
- stream << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+ (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
}
-void CsvParser::process(mutable_buffers_1 buffer) {
+void CsvSampleParser::process(mutable_buffers_1 buffer) {
size_t some = buffer_size(buffer);
auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
@@ -172,7 +180,7 @@ void CsvParser::process(mutable_buffers_1 buffer) {
}
-void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
+void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
auto timestamp = std::chrono::system_clock::now().time_since_epoch().count();
auto s = std::string((char *) packet->data(), packet->size());
@@ -193,7 +201,7 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
map<string, string> values;
values[key] = value;
- sample[key] = value;
+ sample.set(key, value);
flags |= boost::match_prev_avail;
flags |= boost::match_not_bob;
@@ -204,5 +212,67 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) {
}
}
+AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output) :
+ SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) {
+ // Directly select the parser now until we have more than one parser
+ parser = std::move(csvParser);
+ type_ = sample_format_type::CSV;
+}
+
+void AutoSampleParser::process(mutable_buffers_1 buffer) {
+ if (parser) {
+ parser->process(buffer);
+ } else {
+ throw runtime_error("Not implemented yet");
+ }
+}
+
+string to_string(const sample_format_type &arg) {
+ if (arg == sample_format_type::AUTO)
+ return "auto";
+ else if (arg == sample_format_type::CSV)
+ return "csv";
+ else if (arg == sample_format_type::JSON)
+ return "json";
+ else if (arg == sample_format_type::SQL)
+ return "sql";
+ else
+ throw std::runtime_error("Unknown format value: " + to_string(arg));
+}
+
+unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type) {
+ if (type == sample_format_type::CSV) {
+ return make_unique<CsvSampleParser>(output);
+ } else if (type == sample_format_type::AUTO) {
+ return make_unique<AutoSampleParser>(output);
+ } else {
+ throw sample_exception("Unsupported format type: " + to_string(type));
+ }
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, boost::optional<vector<string>> fields) {
+ if (type == sample_format_type::CSV) {
+ if (fields) {
+ return make_unique<CsvSampleOutputStream>(move(output), fields.get());
+ } else {
+ return make_unique<CsvSampleOutputStream>(move(output));
+ }
+ } else if (type == sample_format_type::JSON) {
+ if (fields) {
+ return make_unique<JsonSampleOutputStream>(move(output), fields.get());
+ } else {
+ return make_unique<JsonSampleOutputStream>(move(output));
+ }
+// } else if (type == sample_format_type::SQL) {
+// if (fields) {
+// return make_unique<SqlSampleOutputStream>(move(output), table_name, fields.get());
+// } else {
+// return make_unique<SqlSampleOutputStream>(move(output), table_name);
+// }
+ } else {
+ throw sample_exception("Unsupported format type: " + to_string(type));
+ }
+}
+
}
}
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index 9a144e3..b8f08e9 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -4,8 +4,11 @@
#include <ostream>
#include <vector>
#include <map>
+#include <map>
#include <memory>
#include <boost/asio/buffer.hpp>
+#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
#include <functional>
namespace trygvis {
@@ -14,6 +17,27 @@ namespace soil_moisture {
using namespace std;
using namespace boost::asio;
+template<typename A>
+using o = boost::optional<A>;
+
+enum class sample_format_type {
+ AUTO,
+ CSV,
+ JSON,
+ SQL
+};
+
+string to_string(const sample_format_type &arg);
+
+class SampleStreamParser;
+
+class SampleOutputStream;
+
+unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type = sample_format_type::AUTO);
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output,
+ o<vector<string>> fields = o<vector<string>>());
+
class Sample {
public:
Sample() : entries() {
@@ -34,31 +58,65 @@ public:
return entries.end();
}
- string &operator[](string key) {
- return entries[key];
+ /**
+ * @throws std::out_of_range
+ */
+ inline const string &operator[](string key) {
+ return at(key);
+ }
+
+ /**
+ * @throws std::out_of_range
+ */
+ const string &at(string key) {
+ return entries.at(key);
+ }
+
+ template<class A>
+ const A lexical_at(string key) {
+ return boost::lexical_cast<A>(entries.at(key));
+ }
+
+ void set(const std::string &key, const std::string &value) {
+ entries[key] = value;
}
private:
map<string, string> entries;
};
+class sample_exception : public runtime_error {
+public:
+ sample_exception(const string &what) : runtime_error(what) {
+ }
+};
+
class SampleOutputStream {
public:
virtual void write(Sample sample) = 0;
};
+class VectorSampleOutputStream : public SampleOutputStream {
+
+public:
+ virtual void write(Sample sample) override;
+
+public:
+ vector<Sample> samples;
+};
+
class CsvSampleOutputStream : public SampleOutputStream {
public:
- CsvSampleOutputStream(ostream &stream);
+ CsvSampleOutputStream(unique_ptr<ostream> stream);
- CsvSampleOutputStream(ostream &stream, vector<string> fields);
+ CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields);
void write(Sample values);
private:
void writeHeader();
- ostream &stream;
+ unique_ptr<ostream> stream;
bool headerWritten;
bool filterFields;
vector<string> fields;
@@ -66,40 +124,56 @@ private:
class JsonSampleOutputStream : public SampleOutputStream {
public:
- JsonSampleOutputStream(ostream &stream);
+ JsonSampleOutputStream(unique_ptr<ostream> stream);
- JsonSampleOutputStream(ostream &stream, vector<string> fields);
+ JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields);
void write(Sample values);
private:
- ostream &stream;
+ unique_ptr<ostream> stream;
bool filterFields;
vector<string> fields;
};
class SqlSampleOutputStream : public SampleOutputStream {
public:
- SqlSampleOutputStream(ostream &stream, string table_name);
+ SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name);
- SqlSampleOutputStream(ostream &stream, string table_name, vector<string> fields);
+ SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields);
void write(Sample values);
private:
- ostream &stream;
+ unique_ptr<ostream> stream;
bool filter_fields;
vector<string> fields;
const string table_name;
};
-class CsvParser {
+class SampleStreamParser {
+public:
+ virtual void process(mutable_buffers_1 buffer) = 0;
+
+ virtual sample_format_type type() {
+ return type_;
+ }
+
+protected:
+ sample_format_type type_;
+
+ SampleStreamParser(const sample_format_type type) : type_(type) {
+ }
+};
+
+class CsvSampleParser : public SampleStreamParser {
public:
- CsvParser(shared_ptr<SampleOutputStream> output) : output(output), line(make_shared<vector<uint8_t>>()) {
+ CsvSampleParser(shared_ptr<SampleOutputStream> output) : SampleStreamParser(sample_format_type::CSV),
+ output(output), line(make_shared<vector<uint8_t>>()) {
}
- void process(mutable_buffers_1 buffer);
+ void process(mutable_buffers_1 buffer) override;
private:
void process_line(shared_ptr<vector<uint8_t>> packet);
@@ -109,6 +183,17 @@ private:
shared_ptr<vector<uint8_t>> line;
};
+class AutoSampleParser : public SampleStreamParser {
+public:
+ AutoSampleParser(shared_ptr<SampleOutputStream> output);
+
+private:
+ unique_ptr<SampleStreamParser> parser;
+ unique_ptr<CsvSampleParser> csvParser;
+public:
+ virtual void process(mutable_buffers_1 buffer);
+};
+
}
}
diff --git a/apps/apps.h b/apps/apps.h
index e5d712c..1995e43 100644
--- a/apps/apps.h
+++ b/apps/apps.h
@@ -35,6 +35,9 @@ int launch_app(int argc, char *argv[], app &app);
std::string get_hostname();
+static inline void noop_deleter(void *) {
+}
+
}
}
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index 7bc8d0b..249b737 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -14,10 +14,6 @@ namespace po = boost::program_options;
class sample_convert : public app {
public:
- sample_convert() : table_name(""), input_file(""), input_format(""),
- output_file(""), output_format("") {
- }
-
void add_options(po::options_description_easy_init &options) override {
options
("help", "produce this help message")
@@ -52,11 +48,11 @@ public:
}
}
- ostream *outputStream;
+ unique_ptr<ostream> outputStream;
if (output_file == "-") {
- outputStream = &cout;
+ outputStream = unique_ptr<ostream>(&cout);
} else {
- outputStream = new ofstream(output_file);
+ outputStream = make_unique<ofstream>(output_file);
if (outputStream->fail()) {
cerr << "Unable to open output file " << output_file << endl;
return EXIT_FAILURE;
@@ -64,22 +60,22 @@ public:
}
if (output_format == "plain") {
- output = make_shared<CsvSampleOutputStream>(*outputStream);
+ output = make_shared<CsvSampleOutputStream>(move(outputStream));
} else if (output_format == "json") {
- output = make_shared<JsonSampleOutputStream>(*outputStream);
+ output = make_shared<JsonSampleOutputStream>(move(outputStream));
} else if (output_format == "sql") {
if (table_name.size() == 0) {
cerr << "Missing option: table-name" << endl;
return EXIT_FAILURE;
}
- output = make_shared<SqlSampleOutputStream>(*outputStream, table_name);
+ output = make_shared<SqlSampleOutputStream>(move(outputStream), table_name);
} else {
cerr << "Unsupported output format: " << output_format << endl;
return EXIT_FAILURE;
}
- auto input = make_shared<CsvParser>(output);
+ auto input = make_shared<CsvSampleParser>(output);
char data[100];
while (!inputStream->eof()) {
@@ -87,8 +83,6 @@ public:
input->process(boost::asio::buffer(data, 1));
}
- delete outputStream;
-
return EXIT_SUCCESS;
}
diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp
new file mode 100644
index 0000000..3a0b3e0
--- /dev/null
+++ b/apps/sample-timestamp.cpp
@@ -0,0 +1,156 @@
+#include "SoilMoistureIo.h"
+#include "apps.h"
+#include <fstream>
+#include <sys/stat.h>
+
+namespace trygvis {
+namespace apps {
+
+using namespace std;
+using namespace trygvis::apps;
+using namespace trygvis::soil_moisture;
+namespace po = boost::program_options;
+
+class TimestampFixingSampleOutputStream : public SampleOutputStream {
+
+public:
+ TimestampFixingSampleOutputStream(string timestamp_name, string now_name, time_t start_time, shared_ptr<SampleOutputStream> output) :
+ timestamp_name_(timestamp_name), now_name_(now_name), start_time_(start_time), output_(output) {
+ }
+
+ virtual void write(Sample sample) override {
+ long relative_time = sample.lexical_at<long>(now_name_);
+
+ string new_value = std::to_string(start_time_ + relative_time);
+ sample.set(timestamp_name_, new_value);
+
+ output_->write(sample);
+ };
+
+private:
+ string now_name_, timestamp_name_;
+ time_t start_time_;
+ shared_ptr<SampleOutputStream> output_;
+};
+
+class sample_timestamp : public app {
+
+private:
+ string input_file, timestamp_name, now_name;
+
+public:
+ sample_timestamp() : input_file("") {
+ }
+
+ void add_options(po::options_description_easy_init &options) override {
+ options
+ ("help", "produce this help message")
+ ("input", po::value<string>(&input_file)->required())
+ ("now-name", po::value<string>(&now_name)->default_value("now"))
+ ("timestamp-name", po::value<string>(&timestamp_name)->default_value("timestamp"));
+ }
+
+ int main(app_execution &execution) override {
+ ifstream input(input_file, std::ifstream::in | std::ifstream::binary);
+
+ if (input.fail()) {
+ cerr << "Could not open file: " << input_file << endl;
+ return EXIT_FAILURE;
+ }
+
+ const int buffer_size = 100;
+
+ input.seekg(-buffer_size, ios_base::end);
+
+ struct stat buf;
+
+ if (stat(input_file.c_str(), &buf)) {
+ cerr << "stat failed" << endl;
+ return EXIT_FAILURE;
+ }
+
+ auto sample_buffer = make_shared<VectorSampleOutputStream>();
+ unique_ptr<SampleStreamParser> parser = open_sample_input_stream(sample_buffer);
+ while (!input.eof()) {
+ char buffer[buffer_size];
+ input.read(buffer, buffer_size);
+
+ if (input.bad()) {
+ cerr << "Error reading input" << endl;
+ return EXIT_FAILURE;
+ }
+
+ size_t count = (size_t) input.gcount();
+
+ mutable_buffers_1 b = boost::asio::buffer(buffer, count);
+ parser->process(b);
+ }
+
+ if (sample_buffer->samples.empty()) {
+ cerr << "Could not find any samples" << endl;
+ return EXIT_FAILURE;
+ }
+
+ time_t end_time = buf.st_mtim.tv_sec;
+
+ Sample sample = *--sample_buffer->samples.end();
+
+ string s;
+ try {
+ s = sample.at(now_name);
+ } catch (out_of_range &e) {
+ cerr << "Missing key '" + now_name + "'." << endl;
+ return EXIT_FAILURE;
+ }
+
+ long now;
+ try {
+ now = boost::lexical_cast<long>(s);
+ } catch (const boost::bad_lexical_cast &e) {
+ cerr << "Bad integer value '" + s + "'." << endl;
+ return EXIT_FAILURE;
+ }
+
+ time_t start_time = end_time - now;
+ cerr << "end_time " << end_time << endl;
+ cerr << "now " << now << endl;
+ cerr << "start_time " << start_time << endl;
+
+ // Restart the reading of the input file and add the adjusted timestamp
+ input.clear(ios::eofbit);
+ input.seekg(0);
+ if(input.fail()) {
+ cerr << "Coult not seek input file" << endl;
+ return EXIT_FAILURE;
+ }
+
+ auto output_stream = open_sample_output_stream(parser->type(), unique_ptr<ostream>(&cout));
+ auto p = make_shared<TimestampFixingSampleOutputStream>("timestamp", now_name, start_time, move(output_stream));
+ parser = open_sample_input_stream(p, parser->type());
+
+ int recordCount = 0;
+
+ while (!input.eof()) {
+ char buffer[buffer_size];
+
+ size_t gcount = (size_t)input.readsome(buffer, buffer_size);
+
+ recordCount++;
+
+ mutable_buffers_1 b = boost::asio::buffer(buffer, gcount);
+ parser->process(b);
+ }
+
+ return EXIT_SUCCESS;
+ }
+};
+
+}
+}
+
+using namespace trygvis::apps;
+
+int main(int argc, char *argv[]) {
+ sample_timestamp app;
+ return launch_app(argc, argv, app);
+}
diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp
index fee1a8c..04a718d 100644
--- a/apps/sm-serial-read.cpp
+++ b/apps/sm-serial-read.cpp
@@ -59,7 +59,7 @@ string hostname = get_hostname();
class port_handler {
public:
- port_handler(string device, serial_port &serial_port, shared_ptr<CsvParser> input) :
+ port_handler(string device, serial_port &serial_port, shared_ptr<CsvSampleParser> input) :
device(device), port(serial_port), input(input) {
}
@@ -84,7 +84,7 @@ private:
uint8_t data[size];
mutable_buffers_1 buffer = boost::asio::buffer(data, size);
- shared_ptr<CsvParser> input;
+ shared_ptr<CsvSampleParser> input;
};
class sm_serial_read : public app {
@@ -120,20 +120,21 @@ public:
cerr << "port is not open" << endl;
}
- shared_ptr <SampleOutputStream> output;
+ shared_ptr<SampleOutputStream> output;
+ unique_ptr<ostream> outputStream = unique_ptr<ostream>(&cout);
if (format == Format::JSON) {
- output = make_shared<JsonSampleOutputStream>(cout);
+ output = make_shared<JsonSampleOutputStream>(std::move(outputStream));
} else if (format == Format::SQL) {
- output = make_shared<SqlSampleOutputStream>(cout, "raw");
+ output = make_shared<SqlSampleOutputStream>(std::move(outputStream), "raw");
} else if (format == Format::PLAIN) {
- output = make_shared<CsvSampleOutputStream>(cout);
+ output = make_shared<CsvSampleOutputStream>(std::move(outputStream));
} else {
cerr << "Unsupported format: " << boost::lexical_cast<string>(format) << endl;
return EXIT_FAILURE;
}
- shared_ptr <CsvParser> input = make_shared<CsvParser>(output);
+ shared_ptr<CsvSampleParser> input = make_shared<CsvSampleParser>(output);
port_handler(port_name, port, input).run();