diff options
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | apps/CMakeLists.txt | 13 | ||||
-rw-r--r-- | apps/SoilMoistureIo.cpp | 122 | ||||
-rw-r--r-- | apps/SoilMoistureIo.h | 113 | ||||
-rw-r--r-- | apps/apps.h | 3 | ||||
-rw-r--r-- | apps/sample-convert.cpp | 20 | ||||
-rw-r--r-- | apps/sample-timestamp.cpp | 156 | ||||
-rw-r--r-- | apps/sm-serial-read.cpp | 15 | ||||
m--------- | json | 0 |
9 files changed, 374 insertions, 71 deletions
diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..c971d9f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "json"] + path = json + url = https://github.com/nlohmann/json.git diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index e2082e1..5510cb3 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -1,6 +1,7 @@ list(APPEND APPS log4cplus-test) list(APPEND APPS ble-inspect-device) list(APPEND APPS sample-convert) +list(APPEND APPS sample-timestamp) list(APPEND APPS sm-db-insert) list(APPEND APPS sm-db-select) list(APPEND APPS sm-get-value) @@ -32,19 +33,9 @@ if(LOG4CPLUS_LIBRARIES MATCHES NOTFOUND) message(FATAL_ERROR "Could not find log4cplus library files") endif() -include(ExternalProject) -ExternalProject_Add( - JSON - PREFIX json - GIT_REPOSITORY https://github.com/nlohmann/json.git - GIT_TAG ec42245951fceb7594bfb24746c7449986c3c2a4 - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "") - foreach(app ${APPS}) include_directories("${PROJECT_SOURCE_DIR}/include") - include_directories("${CMAKE_BINARY_DIR}/apps/json/src/JSON/src") + include_directories("${PROJECT_SOURCE_DIR}/json/src") add_executable(${app} ${app}.cpp) diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp index fe485b3..315f48d 100644 --- a/apps/SoilMoistureIo.cpp +++ b/apps/SoilMoistureIo.cpp @@ -11,12 +11,16 @@ namespace soil_moisture { using namespace std; using json = nlohmann::json; -CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream) : - stream(stream), filterFields(false), headerWritten(false) { +void VectorSampleOutputStream::write(Sample sample) { + samples.emplace_back(sample); } -CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector<string> fields) : - stream(stream), fields(fields), filterFields(true), headerWritten(false) { +CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream) : + stream(move(stream)), filterFields(false), headerWritten(false) { +} + +CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) : + stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) { } void CsvSampleOutputStream::write(Sample values) { @@ -25,31 +29,33 @@ void CsvSampleOutputStream::write(Sample values) { headerWritten = true; } + auto &s = *stream.get(); + if (filterFields) { auto i = fields.begin(); while (i != fields.end()) { if (i != fields.begin()) { - stream << ","; + s << ","; } auto field = *i++; auto value = values.find(field); if (value != values.end()) { - stream << value->second; + s << value->second; } } } else { for (auto i = values.begin(); i != values.end();) { - stream << "\"" << (*i).second << "\""; + s << "\"" << (*i).second << "\""; if (++i != values.end()) { - stream << ","; + s << ","; } } } - stream << endl; + s << endl; } void CsvSampleOutputStream::writeHeader() { @@ -57,26 +63,28 @@ void CsvSampleOutputStream::writeHeader() { return; } + auto &s = *stream.get(); + auto i = fields.begin(); while (i != fields.end()) { - stream << *i; + s << *i; i++; if (i != fields.end()) { - stream << ","; + s << ","; } } - stream << endl; + s << endl; } -JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream) : - stream(stream), fields(), filterFields(false) { +JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream) : + stream(move(stream)), fields(), filterFields(false) { } -JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector<string> fields) : - stream(stream), fields(fields), filterFields(true) { +JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) : + stream(move(stream)), fields(fields), filterFields(true) { } void JsonSampleOutputStream::write(Sample values) { @@ -96,15 +104,15 @@ void JsonSampleOutputStream::write(Sample values) { } } - stream << doc << endl; + *stream.get() << doc << endl; } -SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name) : - stream(stream), table_name(table_name), filter_fields(false) { +SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name) : + stream(move(stream)), table_name(table_name), filter_fields(false) { } -SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name, vector<string> fields) : - stream(stream), table_name(table_name), fields(fields), filter_fields(true) { +SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields) : + stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) { } void SqlSampleOutputStream::write(Sample values) { @@ -126,7 +134,7 @@ void SqlSampleOutputStream::write(Sample values) { if (value != values.end()) { vs += "'" + value->second + "'"; } else { - vs + "NULL"; + vs += "NULL"; } i++; @@ -151,10 +159,10 @@ void SqlSampleOutputStream::write(Sample values) { } } - stream << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; + (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } -void CsvParser::process(mutable_buffers_1 buffer) { +void CsvSampleParser::process(mutable_buffers_1 buffer) { size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast<const uint8_t *>(buffer); @@ -172,7 +180,7 @@ void CsvParser::process(mutable_buffers_1 buffer) { } -void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) { +void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); @@ -193,7 +201,7 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) { map<string, string> values; values[key] = value; - sample[key] = value; + sample.set(key, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; @@ -204,5 +212,67 @@ void CsvParser::process_line(shared_ptr<vector<uint8_t>> packet) { } } +AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output) : + SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) { + // Directly select the parser now until we have more than one parser + parser = std::move(csvParser); + type_ = sample_format_type::CSV; +} + +void AutoSampleParser::process(mutable_buffers_1 buffer) { + if (parser) { + parser->process(buffer); + } else { + throw runtime_error("Not implemented yet"); + } +} + +string to_string(const sample_format_type &arg) { + if (arg == sample_format_type::AUTO) + return "auto"; + else if (arg == sample_format_type::CSV) + return "csv"; + else if (arg == sample_format_type::JSON) + return "json"; + else if (arg == sample_format_type::SQL) + return "sql"; + else + throw std::runtime_error("Unknown format value: " + to_string(arg)); +} + +unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type) { + if (type == sample_format_type::CSV) { + return make_unique<CsvSampleParser>(output); + } else if (type == sample_format_type::AUTO) { + return make_unique<AutoSampleParser>(output); + } else { + throw sample_exception("Unsupported format type: " + to_string(type)); + } +} + +unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, boost::optional<vector<string>> fields) { + if (type == sample_format_type::CSV) { + if (fields) { + return make_unique<CsvSampleOutputStream>(move(output), fields.get()); + } else { + return make_unique<CsvSampleOutputStream>(move(output)); + } + } else if (type == sample_format_type::JSON) { + if (fields) { + return make_unique<JsonSampleOutputStream>(move(output), fields.get()); + } else { + return make_unique<JsonSampleOutputStream>(move(output)); + } +// } else if (type == sample_format_type::SQL) { +// if (fields) { +// return make_unique<SqlSampleOutputStream>(move(output), table_name, fields.get()); +// } else { +// return make_unique<SqlSampleOutputStream>(move(output), table_name); +// } + } else { + throw sample_exception("Unsupported format type: " + to_string(type)); + } +} + } } diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h index 9a144e3..b8f08e9 100644 --- a/apps/SoilMoistureIo.h +++ b/apps/SoilMoistureIo.h @@ -4,8 +4,11 @@ #include <ostream> #include <vector> #include <map> +#include <map> #include <memory> #include <boost/asio/buffer.hpp> +#include <boost/optional.hpp> +#include <boost/lexical_cast.hpp> #include <functional> namespace trygvis { @@ -14,6 +17,27 @@ namespace soil_moisture { using namespace std; using namespace boost::asio; +template<typename A> +using o = boost::optional<A>; + +enum class sample_format_type { + AUTO, + CSV, + JSON, + SQL +}; + +string to_string(const sample_format_type &arg); + +class SampleStreamParser; + +class SampleOutputStream; + +unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type = sample_format_type::AUTO); + +unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, + o<vector<string>> fields = o<vector<string>>()); + class Sample { public: Sample() : entries() { @@ -34,31 +58,65 @@ public: return entries.end(); } - string &operator[](string key) { - return entries[key]; + /** + * @throws std::out_of_range + */ + inline const string &operator[](string key) { + return at(key); + } + + /** + * @throws std::out_of_range + */ + const string &at(string key) { + return entries.at(key); + } + + template<class A> + const A lexical_at(string key) { + return boost::lexical_cast<A>(entries.at(key)); + } + + void set(const std::string &key, const std::string &value) { + entries[key] = value; } private: map<string, string> entries; }; +class sample_exception : public runtime_error { +public: + sample_exception(const string &what) : runtime_error(what) { + } +}; + class SampleOutputStream { public: virtual void write(Sample sample) = 0; }; +class VectorSampleOutputStream : public SampleOutputStream { + +public: + virtual void write(Sample sample) override; + +public: + vector<Sample> samples; +}; + class CsvSampleOutputStream : public SampleOutputStream { public: - CsvSampleOutputStream(ostream &stream); + CsvSampleOutputStream(unique_ptr<ostream> stream); - CsvSampleOutputStream(ostream &stream, vector<string> fields); + CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields); void write(Sample values); private: void writeHeader(); - ostream &stream; + unique_ptr<ostream> stream; bool headerWritten; bool filterFields; vector<string> fields; @@ -66,40 +124,56 @@ private: class JsonSampleOutputStream : public SampleOutputStream { public: - JsonSampleOutputStream(ostream &stream); + JsonSampleOutputStream(unique_ptr<ostream> stream); - JsonSampleOutputStream(ostream &stream, vector<string> fields); + JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields); void write(Sample values); private: - ostream &stream; + unique_ptr<ostream> stream; bool filterFields; vector<string> fields; }; class SqlSampleOutputStream : public SampleOutputStream { public: - SqlSampleOutputStream(ostream &stream, string table_name); + SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name); - SqlSampleOutputStream(ostream &stream, string table_name, vector<string> fields); + SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields); void write(Sample values); private: - ostream &stream; + unique_ptr<ostream> stream; bool filter_fields; vector<string> fields; const string table_name; }; -class CsvParser { +class SampleStreamParser { +public: + virtual void process(mutable_buffers_1 buffer) = 0; + + virtual sample_format_type type() { + return type_; + } + +protected: + sample_format_type type_; + + SampleStreamParser(const sample_format_type type) : type_(type) { + } +}; + +class CsvSampleParser : public SampleStreamParser { public: - CsvParser(shared_ptr<SampleOutputStream> output) : output(output), line(make_shared<vector<uint8_t>>()) { + CsvSampleParser(shared_ptr<SampleOutputStream> output) : SampleStreamParser(sample_format_type::CSV), + output(output), line(make_shared<vector<uint8_t>>()) { } - void process(mutable_buffers_1 buffer); + void process(mutable_buffers_1 buffer) override; private: void process_line(shared_ptr<vector<uint8_t>> packet); @@ -109,6 +183,17 @@ private: shared_ptr<vector<uint8_t>> line; }; +class AutoSampleParser : public SampleStreamParser { +public: + AutoSampleParser(shared_ptr<SampleOutputStream> output); + +private: + unique_ptr<SampleStreamParser> parser; + unique_ptr<CsvSampleParser> csvParser; +public: + virtual void process(mutable_buffers_1 buffer); +}; + } } diff --git a/apps/apps.h b/apps/apps.h index e5d712c..1995e43 100644 --- a/apps/apps.h +++ b/apps/apps.h @@ -35,6 +35,9 @@ int launch_app(int argc, char *argv[], app &app); std::string get_hostname(); +static inline void noop_deleter(void *) { +} + } } diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp index 7bc8d0b..249b737 100644 --- a/apps/sample-convert.cpp +++ b/apps/sample-convert.cpp @@ -14,10 +14,6 @@ namespace po = boost::program_options; class sample_convert : public app { public: - sample_convert() : table_name(""), input_file(""), input_format(""), - output_file(""), output_format("") { - } - void add_options(po::options_description_easy_init &options) override { options ("help", "produce this help message") @@ -52,11 +48,11 @@ public: } } - ostream *outputStream; + unique_ptr<ostream> outputStream; if (output_file == "-") { - outputStream = &cout; + outputStream = unique_ptr<ostream>(&cout); } else { - outputStream = new ofstream(output_file); + outputStream = make_unique<ofstream>(output_file); if (outputStream->fail()) { cerr << "Unable to open output file " << output_file << endl; return EXIT_FAILURE; @@ -64,22 +60,22 @@ public: } if (output_format == "plain") { - output = make_shared<CsvSampleOutputStream>(*outputStream); + output = make_shared<CsvSampleOutputStream>(move(outputStream)); } else if (output_format == "json") { - output = make_shared<JsonSampleOutputStream>(*outputStream); + output = make_shared<JsonSampleOutputStream>(move(outputStream)); } else if (output_format == "sql") { if (table_name.size() == 0) { cerr << "Missing option: table-name" << endl; return EXIT_FAILURE; } - output = make_shared<SqlSampleOutputStream>(*outputStream, table_name); + output = make_shared<SqlSampleOutputStream>(move(outputStream), table_name); } else { cerr << "Unsupported output format: " << output_format << endl; return EXIT_FAILURE; } - auto input = make_shared<CsvParser>(output); + auto input = make_shared<CsvSampleParser>(output); char data[100]; while (!inputStream->eof()) { @@ -87,8 +83,6 @@ public: input->process(boost::asio::buffer(data, 1)); } - delete outputStream; - return EXIT_SUCCESS; } diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp new file mode 100644 index 0000000..3a0b3e0 --- /dev/null +++ b/apps/sample-timestamp.cpp @@ -0,0 +1,156 @@ +#include "SoilMoistureIo.h" +#include "apps.h" +#include <fstream> +#include <sys/stat.h> + +namespace trygvis { +namespace apps { + +using namespace std; +using namespace trygvis::apps; +using namespace trygvis::soil_moisture; +namespace po = boost::program_options; + +class TimestampFixingSampleOutputStream : public SampleOutputStream { + +public: + TimestampFixingSampleOutputStream(string timestamp_name, string now_name, time_t start_time, shared_ptr<SampleOutputStream> output) : + timestamp_name_(timestamp_name), now_name_(now_name), start_time_(start_time), output_(output) { + } + + virtual void write(Sample sample) override { + long relative_time = sample.lexical_at<long>(now_name_); + + string new_value = std::to_string(start_time_ + relative_time); + sample.set(timestamp_name_, new_value); + + output_->write(sample); + }; + +private: + string now_name_, timestamp_name_; + time_t start_time_; + shared_ptr<SampleOutputStream> output_; +}; + +class sample_timestamp : public app { + +private: + string input_file, timestamp_name, now_name; + +public: + sample_timestamp() : input_file("") { + } + + void add_options(po::options_description_easy_init &options) override { + options + ("help", "produce this help message") + ("input", po::value<string>(&input_file)->required()) + ("now-name", po::value<string>(&now_name)->default_value("now")) + ("timestamp-name", po::value<string>(×tamp_name)->default_value("timestamp")); + } + + int main(app_execution &execution) override { + ifstream input(input_file, std::ifstream::in | std::ifstream::binary); + + if (input.fail()) { + cerr << "Could not open file: " << input_file << endl; + return EXIT_FAILURE; + } + + const int buffer_size = 100; + + input.seekg(-buffer_size, ios_base::end); + + struct stat buf; + + if (stat(input_file.c_str(), &buf)) { + cerr << "stat failed" << endl; + return EXIT_FAILURE; + } + + auto sample_buffer = make_shared<VectorSampleOutputStream>(); + unique_ptr<SampleStreamParser> parser = open_sample_input_stream(sample_buffer); + while (!input.eof()) { + char buffer[buffer_size]; + input.read(buffer, buffer_size); + + if (input.bad()) { + cerr << "Error reading input" << endl; + return EXIT_FAILURE; + } + + size_t count = (size_t) input.gcount(); + + mutable_buffers_1 b = boost::asio::buffer(buffer, count); + parser->process(b); + } + + if (sample_buffer->samples.empty()) { + cerr << "Could not find any samples" << endl; + return EXIT_FAILURE; + } + + time_t end_time = buf.st_mtim.tv_sec; + + Sample sample = *--sample_buffer->samples.end(); + + string s; + try { + s = sample.at(now_name); + } catch (out_of_range &e) { + cerr << "Missing key '" + now_name + "'." << endl; + return EXIT_FAILURE; + } + + long now; + try { + now = boost::lexical_cast<long>(s); + } catch (const boost::bad_lexical_cast &e) { + cerr << "Bad integer value '" + s + "'." << endl; + return EXIT_FAILURE; + } + + time_t start_time = end_time - now; + cerr << "end_time " << end_time << endl; + cerr << "now " << now << endl; + cerr << "start_time " << start_time << endl; + + // Restart the reading of the input file and add the adjusted timestamp + input.clear(ios::eofbit); + input.seekg(0); + if(input.fail()) { + cerr << "Coult not seek input file" << endl; + return EXIT_FAILURE; + } + + auto output_stream = open_sample_output_stream(parser->type(), unique_ptr<ostream>(&cout)); + auto p = make_shared<TimestampFixingSampleOutputStream>("timestamp", now_name, start_time, move(output_stream)); + parser = open_sample_input_stream(p, parser->type()); + + int recordCount = 0; + + while (!input.eof()) { + char buffer[buffer_size]; + + size_t gcount = (size_t)input.readsome(buffer, buffer_size); + + recordCount++; + + mutable_buffers_1 b = boost::asio::buffer(buffer, gcount); + parser->process(b); + } + + return EXIT_SUCCESS; + } +}; + +} +} + +using namespace trygvis::apps; + +int main(int argc, char *argv[]) { + sample_timestamp app; + return launch_app(argc, argv, app); +} diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp index fee1a8c..04a718d 100644 --- a/apps/sm-serial-read.cpp +++ b/apps/sm-serial-read.cpp @@ -59,7 +59,7 @@ string hostname = get_hostname(); class port_handler { public: - port_handler(string device, serial_port &serial_port, shared_ptr<CsvParser> input) : + port_handler(string device, serial_port &serial_port, shared_ptr<CsvSampleParser> input) : device(device), port(serial_port), input(input) { } @@ -84,7 +84,7 @@ private: uint8_t data[size]; mutable_buffers_1 buffer = boost::asio::buffer(data, size); - shared_ptr<CsvParser> input; + shared_ptr<CsvSampleParser> input; }; class sm_serial_read : public app { @@ -120,20 +120,21 @@ public: cerr << "port is not open" << endl; } - shared_ptr <SampleOutputStream> output; + shared_ptr<SampleOutputStream> output; + unique_ptr<ostream> outputStream = unique_ptr<ostream>(&cout); if (format == Format::JSON) { - output = make_shared<JsonSampleOutputStream>(cout); + output = make_shared<JsonSampleOutputStream>(std::move(outputStream)); } else if (format == Format::SQL) { - output = make_shared<SqlSampleOutputStream>(cout, "raw"); + output = make_shared<SqlSampleOutputStream>(std::move(outputStream), "raw"); } else if (format == Format::PLAIN) { - output = make_shared<CsvSampleOutputStream>(cout); + output = make_shared<CsvSampleOutputStream>(std::move(outputStream)); } else { cerr << "Unsupported format: " << boost::lexical_cast<string>(format) << endl; return EXIT_FAILURE; } - shared_ptr <CsvParser> input = make_shared<CsvParser>(output); + shared_ptr<CsvSampleParser> input = make_shared<CsvSampleParser>(output); port_handler(port_name, port, input).run(); diff --git a/json b/json new file mode 160000 +Subproject 5526de1385c53ebc79739b618568fe1c615986b |