#include "SoilMoistureIo.h" #include "json.hpp" #include #include #include namespace trygvis { namespace soil_moisture { using namespace std; using json = nlohmann::json; void VectorSampleOutputStream::write(SampleRecord const &sample) { samples.emplace_back(sample); } CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : stream(move(stream)), headerWritten(false), dict(dict) { } void CsvSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } // Build the dict with the keys from the first sample. if (dict.empty()) { SampleKeyIndex index = 0; auto ptr = sample.cbegin(); while (ptr != sample.cend()) { auto o = *ptr; if (o) { auto name = sample.dict.at(index)->name; dict.indexOf(name); } ptr++; index++; } } if (!headerWritten) { writeHeader(); headerWritten = true; } auto &s = *stream.get(); auto it = dict.begin(); while (it != dict.end()) { if (it != dict.begin()) { s << ","; } auto key = *it++; auto sampleKey = sample.dict.indexOf(key->name); auto o = sample.at(sampleKey); if (o) { s << o.get(); } } s << endl; } void CsvSampleOutputStream::writeHeader() { auto &s = *stream.get(); auto i = dict.begin(); while (i != dict.end()) { s << (*i)->name; i++; if (i != dict.end()) { s << ","; } } s << endl; } JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void JsonSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } json doc({}); if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { doc[key->name] = value.get(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); doc[sampleKey->name] = o.get(); } } } *stream.get() << doc << endl; } KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr stream, KeyDictionary &dict) : dict(dict), stream(move(stream)) { } void KeyValueSampleOutputStream::write(SampleRecord const &sample) { // Skip empty records if (sample.empty()) { return; } auto &s = *stream.get(); bool first = true; if (!dict.empty()) { for (auto &key: dict) { auto sampleKey = sample.dict.indexOf(key->name); auto value = sample.at(sampleKey); if (value) { if (first) { first = false; } else { s << ", "; } s << key->name << "=" << value.get(); } } } else { for (auto &sampleKey: sample.dict) { auto o = sample.at(sampleKey); if (o) { if (first) { first = false; } else { s << ", "; } // Make sure that the key is registered in the dictionary dict.indexOf(sampleKey->name); s << sampleKey->name << "=" << o.get(); } } } *stream.get() << endl; } SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr stream, KeyDictionary &dict, string table_name) : dict(dict), stream(move(stream)), table_name(table_name) { } void SqlSampleOutputStream::write(SampleRecord const &values) { throw sample_exception("deimplemented"); // string fs, vs; // // fs.reserve(1024); // vs.reserve(1024); // // if (filter_fields) { // auto i = fields.begin(); // // while (i != fields.end()) { // auto field = *i; // // fs += field; // // auto value = values.find(field); // // if (value != values.end()) { // vs += "'" + value->second + "'"; // } else { // vs += "NULL"; // } // // i++; // // if (i != fields.end()) { // fs += ","; // vs += ","; // } // } // } else { // auto i = values.begin(); // while (i != values.end()) { // auto v = *i++; // // fs += v.first; // vs += "'" + v.second + "'"; // // if (i != values.end()) { // fs += ","; // vs += ","; // } // } // } // // (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } void KeyValueSampleParser::process(mutable_buffers_1 buffer) { size_t size = buffer_size(buffer); if (size == 0 && line->size()) { process_line(line); line = make_shared>(); return; } auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < size; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void KeyValueSampleParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); static const boost::regex e("([#_a-zA-Z0-9]+) *= *([0-9]+)"); auto start = s.cbegin(); auto end = s.cend(); boost::match_results what; boost::match_flag_type flags = boost::match_default; SampleRecord sample(dict); while (regex_search(start, end, what, e, flags)) { auto name = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; auto key = dict.indexOf(name); sample.set(key, value); flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } output->write(sample); } AutoSampleParser::AutoSampleParser(shared_ptr output, KeyDictionary &dict) : SampleStreamParser(sample_format_type::AUTO), keyValueParser(new KeyValueSampleParser(output, dict)) { // Directly select the parser now until we have more than one parser parser = std::move(keyValueParser); type_ = sample_format_type::KEY_VALUE; } void AutoSampleParser::process(mutable_buffers_1 buffer) { if (parser) { parser->process(buffer); } else { throw runtime_error("Not implemented yet"); } } string to_string(const sample_format_type &arg) { if (arg == sample_format_type::AUTO) return "auto"; else if (arg == sample_format_type::CSV) return "csv"; else if (arg == sample_format_type::JSON) return "json"; else if (arg == sample_format_type::KEY_VALUE) return "key-value"; else if (arg == sample_format_type::SQL) return "sql"; else throw std::runtime_error("Unknown format value: " + to_string(arg)); } std::ostream& operator<<(std::ostream& os, sample_format_type const& type) { return os << to_string(type); } std::istream& operator>>(std::istream& is, sample_format_type& type) { string s; is >> s; if (s == "auto") { type = sample_format_type::AUTO; } else if (s == "csv") { type = sample_format_type::CSV; } else if (s == "key-value") { type = sample_format_type::KEY_VALUE; } else if (s == "json") { type = sample_format_type::JSON; } else if (s == "sql") { type = sample_format_type::SQL; } return is; } unique_ptr open_sample_input_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::KEY_VALUE) { return make_unique(output, dict); } else if (type == sample_format_type::AUTO) { return make_unique(output, dict); } else { throw sample_exception("No parser for format type: " + to_string(type)); } } unique_ptr open_sample_output_stream( shared_ptr output, KeyDictionary &dict, sample_format_type type) { if (type == sample_format_type::CSV) { return make_unique(output, dict); } else if (type == sample_format_type::KEY_VALUE) { return make_unique(output, dict); } else if (type == sample_format_type::JSON) { return make_unique(output, dict); // } else if (type == sample_format_type::SQL) { // return make_unique(dict, move(output), table_name); } else { throw sample_exception("No writer for format type: " + to_string(type)); } } } }