#include "SoilMoistureIo.h" #include "json.hpp" #include #include #include namespace trygvis { namespace soil_moisture { using namespace std; using json = nlohmann::json; void VectorSampleOutputStream::write(SampleRecord sample) { samples.emplace_back(sample); } vector KeyDictionary::findIndexes(vector keys) { vector indexes; for (auto &key: keys) { auto index = indexOf(key); indexes.push_back(index); } return move(indexes); } CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : dict(dict), stream(move(stream)), headerWritten(false) { } CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fieldKeys) : dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) { } void CsvSampleOutputStream::write(SampleRecord values) { // Skip empty records if (values.empty()) { return; } if (fields.empty()) { KeyDictionary::index_t index = 0; auto ptr = values.begin(); while (ptr != values.end()) { auto o = *ptr; if (o) { fields.push_back(index); } ptr++; index++; } } if (!headerWritten) { writeHeader(); headerWritten = true; } auto &s = *stream.get(); auto i = fields.begin(); while (i != fields.end()) { if (i != fields.begin()) { s << ","; } auto index = *i++; auto o = values.at(index); if (o) { s << o.get(); } } s << endl; } void CsvSampleOutputStream::writeHeader() { auto &s = *stream.get(); auto i = fields.begin(); while (i != fields.end()) { s << dict.nameOf(*i); i++; if (i != fields.end()) { s << ","; } } s << endl; } JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream) : dict(dict), stream(move(stream)), filterFields(false) { } JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr stream, vector fields) : dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) { } void JsonSampleOutputStream::write(SampleRecord values) { throw sample_exception("deimplemented"); json doc({}); // if (filterFields) { // for (auto &f: fields) { // auto value = values.find(f); // // if (value != values.end()) { // doc[f] = value->second; // } // } // } else { // for (auto &v: values) { // doc[v.first] = v.second; // } // } *stream.get() << doc << endl; } SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name) : dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) { } SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr stream, string table_name, vector fields) : dict(dict), stream(move(stream)), table_name(table_name), fields(dict.findIndexes(fields)), filter_fields(true) { } void SqlSampleOutputStream::write(SampleRecord values) { throw sample_exception("deimplemented"); // string fs, vs; // // fs.reserve(1024); // vs.reserve(1024); // // if (filter_fields) { // auto i = fields.begin(); // // while (i != fields.end()) { // auto field = *i; // // fs += field; // // auto value = values.find(field); // // if (value != values.end()) { // vs += "'" + value->second + "'"; // } else { // vs += "NULL"; // } // // i++; // // if (i != fields.end()) { // fs += ","; // vs += ","; // } // } // } else { // auto i = values.begin(); // while (i != values.end()) { // auto v = *i++; // // fs += v.first; // vs += "'" + v.second + "'"; // // if (i != values.end()) { // fs += ","; // vs += ","; // } // } // } // // (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } void CsvSampleParser::process(mutable_buffers_1 buffer) { size_t size = buffer_size(buffer); if (size == 0 && line->size()) { process_line(line); line = make_shared>(); return; } auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void CsvSampleParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)"); std::string::const_iterator start = s.begin(); std::string::const_iterator end = s.end(); boost::match_results what; boost::match_flag_type flags = boost::match_default; SampleRecord sample(dict); while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; map values; values[key] = value; auto index = dict.indexOf(key); sample.set(index, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } output->write(sample); } AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr output) : SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) { // Directly select the parser now until we have more than one parser parser = std::move(csvParser); type_ = sample_format_type::CSV; } void AutoSampleParser::process(mutable_buffers_1 buffer) { if (parser) { parser->process(buffer); } else { throw runtime_error("Not implemented yet"); } } string to_string(const sample_format_type &arg) { if (arg == sample_format_type::AUTO) return "auto"; else if (arg == sample_format_type::CSV) return "csv"; else if (arg == sample_format_type::JSON) return "json"; else if (arg == sample_format_type::SQL) return "sql"; else throw std::runtime_error("Unknown format value: " + to_string(arg)); } unique_ptr open_sample_input_stream( KeyDictionary &dict, shared_ptr output, sample_format_type type) { if (type == sample_format_type::CSV) { return make_unique(dict, output); } else if (type == sample_format_type::AUTO) { return make_unique(dict, output); } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } unique_ptr open_sample_output_stream( KeyDictionary &dict, sample_format_type type, unique_ptr output, o> fields) { if (type == sample_format_type::CSV) { if (fields) { return make_unique(dict, move(output), fields.get()); } else { return make_unique(dict, move(output)); } } else if (type == sample_format_type::JSON) { if (fields) { return make_unique(dict, move(output), fields.get()); } else { return make_unique(dict, move(output)); } // } else if (type == sample_format_type::SQL) { // if (fields) { // return make_unique(dict, move(output), table_name, fields.get()); // } else { // return make_unique(dict, move(output), table_name); // } } else { throw sample_exception("Unsupported format type: " + to_string(type)); } } } }