#include "SoilMoistureIo.h" #include "json.hpp" #include #include #include namespace trygvis { namespace soil_moisture { using namespace std; using json = nlohmann::json; void VectorSampleOutputStream::write(Sample sample) { samples.emplace_back(sample); } CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream) : stream(move(stream)), filterFields(false), headerWritten(false) { } CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr stream, vector fields) : stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) { } void CsvSampleOutputStream::write(Sample values) { if (!headerWritten) { writeHeader(); headerWritten = true; } auto &s = *stream.get(); if (filterFields) { auto i = fields.begin(); while (i != fields.end()) { if (i != fields.begin()) { s << ","; } auto field = *i++; auto value = values.find(field); if (value != values.end()) { s << value->second; } } } else { for (auto i = values.begin(); i != values.end();) { s << "\"" << (*i).second << "\""; if (++i != values.end()) { s << ","; } } } s << endl; } void CsvSampleOutputStream::writeHeader() { if (fields.size() == 0) { return; } auto &s = *stream.get(); auto i = fields.begin(); while (i != fields.end()) { s << *i; i++; if (i != fields.end()) { s << ","; } } s << endl; } JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream) : stream(move(stream)), fields(), filterFields(false) { } JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr stream, vector fields) : stream(move(stream)), fields(fields), filterFields(true) { } void JsonSampleOutputStream::write(Sample values) { json doc({}); if (filterFields) { for (auto &f: fields) { auto value = values.find(f); if (value != values.end()) { doc[f] = value->second; } } } else { for (auto &v: values) { doc[v.first] = v.second; } } *stream.get() << doc << endl; } SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name) : stream(move(stream)), table_name(table_name), filter_fields(false) { } SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr stream, string table_name, vector fields) : stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) { } void SqlSampleOutputStream::write(Sample values) { string fs, vs; fs.reserve(1024); vs.reserve(1024); if (filter_fields) { auto i = fields.begin(); while (i != fields.end()) { auto field = *i; fs += field; auto value = values.find(field); if (value != values.end()) { vs += "'" + value->second + "'"; } else { vs += "NULL"; } i++; if (i != fields.end()) { fs += ","; vs += ","; } } } else { auto i = values.begin(); while (i != values.end()) { auto v = *i++; fs += v.first; vs += "'" + v.second + "'"; if (i != values.end()) { fs += ","; vs += ","; } } } (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } void CsvSampleParser::process(mutable_buffers_1 buffer) { size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < some; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void CsvSampleParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)"); std::string::const_iterator start = s.begin(); std::string::const_iterator end = s.end(); boost::match_results what; boost::match_flag_type flags = boost::match_default; Sample sample; while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; map values; values[key] = value; sample.set(key, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } if (sample.begin() != sample.end()) { output->write(sample); } } AutoSampleParser::AutoSampleParser(shared_ptr output) : SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) { // Directly select the parser now until we have more than one parser parser = std::move(csvParser); type_ = sample_format_type::CSV; } void AutoSampleParser::process(mutable_buffers_1 buffer) { if (parser) { parser->process(buffer); } else { throw runtime_error("Not implemented yet"); } } string to_string(const sample_format_type &arg) { if (arg == sample_format_type::AUTO) return "auto"; else if (arg == sample_format_type::CSV) return "csv"; else if (arg == sample_format_type::JSON) return "json"; else if (arg == sample_format_type::SQL) return "sql"; else throw std::runtime_error("Unknown format value: " + to_string(arg)); } unique_ptr open_sample_input_stream(shared_ptr output, sample_format_type type) { if (type == sample_format_type::CSV) { return make_unique(output); } else if (type == sample_format_type::AUTO) { return make_unique(output); } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } unique_ptr open_sample_output_stream(sample_format_type type, unique_ptr output, boost::optional> fields) { if (type == sample_format_type::CSV) { if (fields) { return make_unique(move(output), fields.get()); } else { return make_unique(move(output)); } } else if (type == sample_format_type::JSON) { if (fields) { return make_unique(move(output), fields.get()); } else { return make_unique(move(output)); } // } else if (type == sample_format_type::SQL) { // if (fields) { // return make_unique(move(output), table_name, fields.get()); // } else { // return make_unique(move(output), table_name); // } } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } } }