#include "SoilMoistureIo.h" #include "json.hpp" #include #include #include namespace trygvis { namespace soil_moisture { using namespace std; using json = nlohmann::json; CsvSampleOutputStream::CsvSampleOutputStream(ostream &stream, vector fields) : stream(stream), fields(fields) { auto i = fields.begin(); while (true) { stream << *i; i++; if (i != fields.end()) { stream << ","; } else { break; } } stream << endl; } void CsvSampleOutputStream::write(Sample values) { auto i = fields.begin(); while (true) { auto field = *i; auto value = values.find(field); if (value != values.end()) { stream << value->second; } i++; if (i != fields.end()) { stream << ","; } else { break; } } stream << endl; } JsonSampleOutputStream::JsonSampleOutputStream(ostream &stream, vector fields) : stream(stream), fields(fields) { } void JsonSampleOutputStream::write(Sample values) { json doc({}); for (auto &f: fields) { auto value = values.find(f); if (value != values.end()) { doc[f] = value->second; } } stream << doc << endl; } SqlSampleOutputStream::SqlSampleOutputStream(ostream &stream, vector fields) : stream(stream), fields(fields) { } void SqlSampleOutputStream::write(Sample values) { auto i = fields.begin(); stringstream fs, vs; while (true) { auto field = *i; fs << field; auto value = values.find(field); if (value != values.end()) { vs << "'" << value->second << "'"; } else { vs << "NULL"; } i++; if (i != fields.end()) { fs << ","; vs << ","; } else { break; } } stream << "INSERT INTO (" << fs << ") VALUES(" << vs << ")" << endl; } void CsvParser::process(mutable_buffers_1 buffer) { size_t some = buffer_size(buffer); auto data = boost::asio::buffer_cast(buffer); for (int i = 0; i < some; i++) { uint8_t b = data[i]; if (b == packet_delimiter) { process_line(line); line = make_shared>(); } else { line->push_back(b); } } } void CsvParser::process_line(shared_ptr> packet) { auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto s = std::string((char *) packet->data(), packet->size()); // cerr << "packet: " << s << ", size=" << packet->size() << endl; static const boost::regex e("([_a-zA-Z0-9]+)=([0-9]+)"); std::string::const_iterator start = s.begin(); std::string::const_iterator end = s.end(); boost::match_results what; boost::match_flag_type flags = boost::match_default; Sample sample; while (regex_search(start, end, what, e, flags)) { auto key = static_cast(what[1]); auto value = static_cast(what[2]); start = what[0].second; // static const string device_type = "serial"; map values; values[key] = value; // values["hostname"] = hostname; // values["device"] = device; // values["device_type"] = device_type; // values["timestamp"] = to_string(timestamp); // values["sensor"] = sensor; // values["value"] = value; // cerr << key << " => " << value << endl; sample[key] = value; flags |= boost::match_prev_avail; flags |= boost::match_not_bob; } if (sample.begin() != sample.end()) { output->write(sample); } } } }