#include "SoilMoistureIo.h" #include "json.hpp" #include #include #include namespace trygvis { namespace soil_moisture { using namespace std; using json = nlohmann::json; CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream) : stream(stream), filterFields(false), headerWritten(false) { } CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector fields) : stream(stream), fields(fields), filterFields(true), headerWritten(false) { } void CsvSampleOutputStream::write(Sample values) { if (!headerWritten) { writeHeader(); headerWritten = true; } if (filterFields) { auto i = fields.begin(); while (i != fields.end()) { if (i != fields.begin()) { stream << ","; } auto field = *i++; auto value = values.find(field); if (value != values.end()) { stream << value->second; } } } else { for (auto i = values.begin(); i != values.end();) { stream << "\"" << (*i).second << "\""; if (++i != values.end()) { stream << ","; } } } stream << endl; } void CsvSampleOutputStream::writeHeader() { if (fields.size() == 0) { return; } auto i = fields.begin(); while (i != fields.end()) { stream << *i; i++; if (i != fields.end()) { stream << ","; } } stream << endl; } JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream) : stream(stream), fields(), filterFields(false) { } JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector fields) : stream(stream), fields(fields), filterFields(true) { } void JsonSampleOutputStream::write(Sample values) { json doc({}); if (filterFields) { for (auto &f: fields) { auto value = values.find(f); if (value != values.end()) { doc[f] = value->second; } } } else { for (auto &v: values) { doc[v.first] = v.second; } } stream << doc << endl; } SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name) : stream(stream), table_name(table_name), filter_fields(false) { } SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, string table_name, vector fields) : stream(stream), table_name(table_name), fields(fields), filter_fields(true) { } void SqlSampleOutputStream::write(Sample values) { string fs, vs; fs.reserve(1024); vs.reserve(1024); if (filter_fields) { auto i = fields.begin(); while (i != fields.end()) { auto field = *i; fs += field; auto value = values.find(field); if (value != values.end()) { vs += "'" + value->second + "'"; } else { vs + "NULL"; } i++; if (i != fields.end()) { fs += ","; vs += ","; } } } else { auto i = values.begin(); while (i != values.end()) { auto v = *i++; fs += v.first; vs += "'" + v.second + "'"; if (i != values.end()) { fs += ","; vs += ","; } } } stream << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl; } void CsvParser::process(mutable_buffers_1 buffer) { size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < some; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void CsvParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)"); std::string::const_iterator start = s.begin(); std::string::const_iterator end = s.end(); boost::match_results what; boost::match_flag_type flags = boost::match_default; Sample sample; while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; map values; values[key] = value; sample[key] = value; flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } if (sample.begin() != sample.end()) { output->write(sample); } } } }