aboutsummaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/SoilMoistureIo.cpp272
-rw-r--r--apps/SoilMoistureIo.h197
-rw-r--r--apps/sample-convert.cpp9
-rw-r--r--apps/sample-timestamp.cpp59
-rw-r--r--apps/sm-serial-read.cpp10
5 files changed, 347 insertions, 200 deletions
diff --git a/apps/SoilMoistureIo.cpp b/apps/SoilMoistureIo.cpp
index 315f48d..b8a8b64 100644
--- a/apps/SoilMoistureIo.cpp
+++ b/apps/SoilMoistureIo.cpp
@@ -11,19 +11,51 @@ namespace soil_moisture {
using namespace std;
using json = nlohmann::json;
-void VectorSampleOutputStream::write(Sample sample) {
+void VectorSampleOutputStream::write(SampleRecord sample) {
samples.emplace_back(sample);
}
-CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream) :
- stream(move(stream)), filterFields(false), headerWritten(false) {
+vector<KeyDictionary::index_t> KeyDictionary::findIndexes(vector<SampleKey> keys) {
+ vector<KeyDictionary::index_t> indexes;
+
+ for (auto &key: keys) {
+ auto index = indexOf(key);
+ indexes.push_back(index);
+ }
+
+ return move(indexes);
+}
+
+CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
+ dict(dict), stream(move(stream)), headerWritten(false) {
}
-CsvSampleOutputStream::CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
- stream(move(stream)), fields(fields), filterFields(true), headerWritten(false) {
+CsvSampleOutputStream::CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fieldKeys)
+ :
+ dict(dict), stream(move(stream)), headerWritten(false), fields(dict.findIndexes(fieldKeys)) {
}
-void CsvSampleOutputStream::write(Sample values) {
+void CsvSampleOutputStream::write(SampleRecord values) {
+ // Skip empty records
+ if (values.empty()) {
+ return;
+ }
+
+ if (fields.empty()) {
+ KeyDictionary::index_t index = 0;
+ auto ptr = values.begin();
+ while (ptr != values.end()) {
+ auto o = *ptr;
+
+ if (o) {
+ fields.push_back(index);
+ }
+
+ ptr++;
+ index++;
+ }
+ }
+
if (!headerWritten) {
writeHeader();
headerWritten = true;
@@ -31,27 +63,17 @@ void CsvSampleOutputStream::write(Sample values) {
auto &s = *stream.get();
- if (filterFields) {
- auto i = fields.begin();
- while (i != fields.end()) {
- if (i != fields.begin()) {
- s << ",";
- }
-
- auto field = *i++;
- auto value = values.find(field);
-
- if (value != values.end()) {
- s << value->second;
- }
+ auto i = fields.begin();
+ while (i != fields.end()) {
+ if (i != fields.begin()) {
+ s << ",";
}
- } else {
- for (auto i = values.begin(); i != values.end();) {
- s << "\"" << (*i).second << "\"";
- if (++i != values.end()) {
- s << ",";
- }
+ auto index = *i++;
+ auto o = values.at(index);
+
+ if (o) {
+ s << o.get();
}
}
@@ -59,15 +81,11 @@ void CsvSampleOutputStream::write(Sample values) {
}
void CsvSampleOutputStream::writeHeader() {
- if (fields.size() == 0) {
- return;
- }
-
auto &s = *stream.get();
auto i = fields.begin();
while (i != fields.end()) {
- s << *i;
+ s << dict.nameOf(*i);
i++;
@@ -79,95 +97,112 @@ void CsvSampleOutputStream::writeHeader() {
s << endl;
}
-JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream) :
- stream(move(stream)), fields(), filterFields(false) {
+JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream) :
+ dict(dict), stream(move(stream)), filterFields(false) {
}
-JsonSampleOutputStream::JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields) :
- stream(move(stream)), fields(fields), filterFields(true) {
+JsonSampleOutputStream::JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fields)
+ :
+ dict(dict), stream(move(stream)), fields(dict.findIndexes(fields)), filterFields(true) {
}
-void JsonSampleOutputStream::write(Sample values) {
- json doc({});
+void JsonSampleOutputStream::write(SampleRecord values) {
+ throw sample_exception("deimplemented");
- if (filterFields) {
- for (auto &f: fields) {
- auto value = values.find(f);
+ json doc({});
- if (value != values.end()) {
- doc[f] = value->second;
- }
- }
- } else {
- for (auto &v: values) {
- doc[v.first] = v.second;
- }
- }
+// if (filterFields) {
+// for (auto &f: fields) {
+// auto value = values.find(f);
+//
+// if (value != values.end()) {
+// doc[f] = value->second;
+// }
+// }
+// } else {
+// for (auto &v: values) {
+// doc[v.first] = v.second;
+// }
+// }
*stream.get() << doc << endl;
}
-SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name) :
- stream(move(stream)), table_name(table_name), filter_fields(false) {
+SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name) :
+ dict(dict), stream(move(stream)), table_name(table_name), filter_fields(false) {
}
-SqlSampleOutputStream::SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields) :
- stream(move(stream)), table_name(table_name), fields(fields), filter_fields(true) {
+SqlSampleOutputStream::SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name, vector<SampleKey> fields)
+ :
+ dict(dict),
+ stream(move(stream)),
+ table_name(table_name),
+ fields(dict.findIndexes(fields)),
+ filter_fields(true) {
}
-void SqlSampleOutputStream::write(Sample values) {
- string fs, vs;
-
- fs.reserve(1024);
- vs.reserve(1024);
-
- if (filter_fields) {
- auto i = fields.begin();
-
- while (i != fields.end()) {
- auto field = *i;
-
- fs += field;
-
- auto value = values.find(field);
-
- if (value != values.end()) {
- vs += "'" + value->second + "'";
- } else {
- vs += "NULL";
- }
-
- i++;
+void SqlSampleOutputStream::write(SampleRecord values) {
+ throw sample_exception("deimplemented");
+
+// string fs, vs;
+//
+// fs.reserve(1024);
+// vs.reserve(1024);
+//
+// if (filter_fields) {
+// auto i = fields.begin();
+//
+// while (i != fields.end()) {
+// auto field = *i;
+//
+// fs += field;
+//
+// auto value = values.find(field);
+//
+// if (value != values.end()) {
+// vs += "'" + value->second + "'";
+// } else {
+// vs += "NULL";
+// }
+//
+// i++;
+//
+// if (i != fields.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// } else {
+// auto i = values.begin();
+// while (i != values.end()) {
+// auto v = *i++;
+//
+// fs += v.first;
+// vs += "'" + v.second + "'";
+//
+// if (i != values.end()) {
+// fs += ",";
+// vs += ",";
+// }
+// }
+// }
+//
+// (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
+}
- if (i != fields.end()) {
- fs += ",";
- vs += ",";
- }
- }
- } else {
- auto i = values.begin();
- while (i != values.end()) {
- auto v = *i++;
+void CsvSampleParser::process(mutable_buffers_1 buffer) {
- fs += v.first;
- vs += "'" + v.second + "'";
+ size_t size = buffer_size(buffer);
- if (i != values.end()) {
- fs += ",";
- vs += ",";
- }
- }
+ if (size == 0 && line->size()) {
+ process_line(line);
+ line = make_shared<vector<uint8_t>>();
+ return;
}
- (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl;
-}
-
-void CsvSampleParser::process(mutable_buffers_1 buffer) {
-
- size_t some = buffer_size(buffer);
auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);
- for (int i = 0; i < some; i++) {
+ for (int i = 0; i < size; i++) {
uint8_t b = data[i];
if (b == packet_delimiter) {
@@ -191,7 +226,7 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
boost::match_results<std::string::const_iterator> what;
boost::match_flag_type flags = boost::match_default;
- Sample sample;
+ SampleRecord sample(dict);
while (regex_search(start, end, what, e, flags)) {
auto key = static_cast<string>(what[1]);
@@ -201,19 +236,18 @@ void CsvSampleParser::process_line(shared_ptr<vector<uint8_t>> packet) {
map<string, string> values;
values[key] = value;
- sample.set(key, value);
+ auto index = dict.indexOf(key);
+ sample.set(index, value);
flags |= boost::match_prev_avail;
flags |= boost::match_not_bob;
}
- if (sample.begin() != sample.end()) {
- output->write(sample);
- }
+ output->write(sample);
}
-AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output) :
- SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(output)) {
+AutoSampleParser::AutoSampleParser(KeyDictionary &dict, shared_ptr<SampleOutputStream> output) :
+ SampleStreamParser(sample_format_type::AUTO), csvParser(new CsvSampleParser(dict, output)) {
// Directly select the parser now until we have more than one parser
parser = std::move(csvParser);
type_ = sample_format_type::CSV;
@@ -240,34 +274,42 @@ string to_string(const sample_format_type &arg) {
throw std::runtime_error("Unknown format value: " + to_string(arg));
}
-unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type) {
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ KeyDictionary &dict,
+ shared_ptr<SampleOutputStream> output,
+ sample_format_type type) {
if (type == sample_format_type::CSV) {
- return make_unique<CsvSampleParser>(output);
+ return make_unique<CsvSampleParser>(dict, output);
} else if (type == sample_format_type::AUTO) {
- return make_unique<AutoSampleParser>(output);
+ return make_unique<AutoSampleParser>(dict, output);
} else {
throw sample_exception("Unsupported format type: " + to_string(type));
}
}
-unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output, boost::optional<vector<string>> fields) {
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ KeyDictionary &dict,
+ sample_format_type type,
+ unique_ptr<ostream> output,
+ o<vector<SampleKey>> fields) {
+
if (type == sample_format_type::CSV) {
if (fields) {
- return make_unique<CsvSampleOutputStream>(move(output), fields.get());
+ return make_unique<CsvSampleOutputStream>(dict, move(output), fields.get());
} else {
- return make_unique<CsvSampleOutputStream>(move(output));
+ return make_unique<CsvSampleOutputStream>(dict, move(output));
}
} else if (type == sample_format_type::JSON) {
if (fields) {
- return make_unique<JsonSampleOutputStream>(move(output), fields.get());
+ return make_unique<JsonSampleOutputStream>(dict, move(output), fields.get());
} else {
- return make_unique<JsonSampleOutputStream>(move(output));
+ return make_unique<JsonSampleOutputStream>(dict, move(output));
}
// } else if (type == sample_format_type::SQL) {
// if (fields) {
-// return make_unique<SqlSampleOutputStream>(move(output), table_name, fields.get());
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name, fields.get());
// } else {
-// return make_unique<SqlSampleOutputStream>(move(output), table_name);
+// return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
// }
} else {
throw sample_exception("Unsupported format type: " + to_string(type));
diff --git a/apps/SoilMoistureIo.h b/apps/SoilMoistureIo.h
index b8f08e9..b8f0b52 100644
--- a/apps/SoilMoistureIo.h
+++ b/apps/SoilMoistureIo.h
@@ -33,121 +33,216 @@ class SampleStreamParser;
class SampleOutputStream;
-unique_ptr<SampleStreamParser> open_sample_input_stream(shared_ptr<SampleOutputStream> output, sample_format_type type = sample_format_type::AUTO);
+class KeyDictionary;
-unique_ptr<SampleOutputStream> open_sample_output_stream(sample_format_type type, unique_ptr<ostream> output,
- o<vector<string>> fields = o<vector<string>>());
+class SampleKey;
-class Sample {
+unique_ptr<SampleStreamParser> open_sample_input_stream(
+ KeyDictionary &dict,
+ shared_ptr<SampleOutputStream> output,
+ sample_format_type type = sample_format_type::AUTO);
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+ KeyDictionary &dict,
+ sample_format_type type,
+ unique_ptr<ostream> output,
+ o<vector<SampleKey>> fields = o<vector<SampleKey>>());
+
+class sample_exception : public runtime_error {
public:
- Sample() : entries() {
+ sample_exception(const string &what) : runtime_error(what) {
}
+};
- Sample(map<string, string> entries) : entries(entries) {
+struct SampleKey {
+ // TODO: only the dictionary should be able to create keys
+ SampleKey(string &name) : name(name) {
+ if (name.length() == 0) {
+ throw sample_exception("Bad sample key.");
+ }
}
- map<string, string>::iterator find(string &s) {
- return entries.find(s);
+ inline
+ bool operator==(const SampleKey &that) const {
+ return name == that.name;
}
- map<string, string>::iterator begin() {
- return entries.begin();
- }
+ string name;
+};
+
+class KeyDictionary {
+public:
+ typedef vector<SampleKey> v;
+ typedef v::size_type index_t;
- map<string, string>::iterator end() {
- return entries.end();
+ KeyDictionary() {
}
- /**
- * @throws std::out_of_range
- */
- inline const string &operator[](string key) {
- return at(key);
+ index_t indexOf(const SampleKey key) {
+ index_t i = 0;
+ for (auto ptr = keys.begin(); ptr != keys.end(); ptr++, i++) {
+ if (*ptr == key) {
+ return i;
+ }
+ }
+
+ keys.push_back(key);
+
+ return keys.size() - 1;
}
- /**
- * @throws std::out_of_range
- */
- const string &at(string key) {
- return entries.at(key);
+ vector<index_t> findIndexes(v keys);
+
+ inline
+ v::const_iterator begin() {
+ return keys.begin();
}
- template<class A>
- const A lexical_at(string key) {
- return boost::lexical_cast<A>(entries.at(key));
+ inline
+ v::const_iterator end() {
+ return keys.end();
}
- void set(const std::string &key, const std::string &value) {
- entries[key] = value;
+ string nameOf(index_t index) {
+ return keys.at(index).name;
}
private:
- map<string, string> entries;
+ v keys;
};
-class sample_exception : public runtime_error {
+class SampleRecord {
public:
- sample_exception(const string &what) : runtime_error(what) {
+ typedef vector<o<string>> vec;
+
+ SampleRecord(KeyDictionary &dict) : dict(dict) {
+ }
+
+ SampleRecord(KeyDictionary &dict, vec values)
+ : dict(dict), values(values) {
+ }
+
+ inline
+ vec::const_iterator begin() {
+ return values.begin();
+ }
+
+ inline
+ vec::const_iterator end() {
+ return values.end();
}
+
+ inline
+ bool empty() {
+ return values.empty();
+ }
+
+ o<string> at(size_t index) {
+ if (index >= values.size()) {
+ return o<string>();
+ }
+
+ return values.at(index);
+ }
+
+ void set(const KeyDictionary::index_t index, const std::string &value) {
+ values.resize(max(values.size(), index + 1));
+
+ values[index] = o<string>(value);
+ }
+
+ template<class A>
+ const o<A> lexical_at(KeyDictionary::index_t index) {
+ auto value = at(index);
+
+ if (!value) {
+ return o<A>();
+ }
+
+ return o<A>(boost::lexical_cast<A>(value.get()));
+ }
+
+ string to_string() {
+ KeyDictionary::index_t i = 0;
+ string s;
+ for (auto ptr = values.begin(); ptr != values.end(); ptr++, i++) {
+ auto o = *ptr;
+
+ if (!o) {
+ continue;
+ }
+
+ auto value = o.get();
+
+ s += dict.nameOf(i) + " = " + value + ", ";
+ }
+ return s;
+ }
+
+private:
+ KeyDictionary &dict;
+ vec values;
};
class SampleOutputStream {
public:
- virtual void write(Sample sample) = 0;
+ virtual void write(SampleRecord sample) = 0;
};
class VectorSampleOutputStream : public SampleOutputStream {
public:
- virtual void write(Sample sample) override;
+ virtual void write(SampleRecord sample) override;
public:
- vector<Sample> samples;
+ vector<SampleRecord> samples;
};
class CsvSampleOutputStream : public SampleOutputStream {
public:
- CsvSampleOutputStream(unique_ptr<ostream> stream);
+ CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream);
- CsvSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields);
+ CsvSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fields);
- void write(Sample values);
+ void write(SampleRecord values);
private:
void writeHeader();
+ KeyDictionary &dict;
unique_ptr<ostream> stream;
bool headerWritten;
- bool filterFields;
- vector<string> fields;
+ vector<KeyDictionary::index_t> fields;
};
class JsonSampleOutputStream : public SampleOutputStream {
public:
- JsonSampleOutputStream(unique_ptr<ostream> stream);
+ JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream);
- JsonSampleOutputStream(unique_ptr<ostream> stream, vector<string> fields);
+ JsonSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, vector<SampleKey> fields);
- void write(Sample values);
+ void write(SampleRecord values);
private:
+ KeyDictionary &dict;
unique_ptr<ostream> stream;
bool filterFields;
- vector<string> fields;
+ vector<KeyDictionary::index_t> fields;
};
class SqlSampleOutputStream : public SampleOutputStream {
public:
- SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name);
+ SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name);
- SqlSampleOutputStream(unique_ptr<ostream> stream, string table_name, vector<string> fields);
+ SqlSampleOutputStream(KeyDictionary &dict, unique_ptr<ostream> stream, string table_name, vector<SampleKey> fields);
- void write(Sample values);
+ void write(SampleRecord values);
private:
+ KeyDictionary &dict;
unique_ptr<ostream> stream;
bool filter_fields;
- vector<string> fields;
+ vector<KeyDictionary::index_t> fields;
const string table_name;
};
@@ -169,8 +264,9 @@ protected:
class CsvSampleParser : public SampleStreamParser {
public:
- CsvSampleParser(shared_ptr<SampleOutputStream> output) : SampleStreamParser(sample_format_type::CSV),
- output(output), line(make_shared<vector<uint8_t>>()) {
+ CsvSampleParser(KeyDictionary &dict, shared_ptr<SampleOutputStream> output) :
+ SampleStreamParser(sample_format_type::CSV), dict(dict), output(output),
+ line(make_shared<vector<uint8_t>>()) {
}
void process(mutable_buffers_1 buffer) override;
@@ -179,13 +275,14 @@ private:
void process_line(shared_ptr<vector<uint8_t>> packet);
static const uint8_t packet_delimiter = '\n';
+ KeyDictionary &dict;
shared_ptr<SampleOutputStream> output;
shared_ptr<vector<uint8_t>> line;
};
class AutoSampleParser : public SampleStreamParser {
public:
- AutoSampleParser(shared_ptr<SampleOutputStream> output);
+ AutoSampleParser(KeyDictionary &dict, shared_ptr<SampleOutputStream> output);
private:
unique_ptr<SampleStreamParser> parser;
diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index 249b737..b3e5c02 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -35,6 +35,7 @@ public:
auto desc = execution.desc;
auto vm = execution.vm;
+ KeyDictionary dict;
shared_ptr<SampleOutputStream> output;
istream *inputStream;
@@ -60,22 +61,22 @@ public:
}
if (output_format == "plain") {
- output = make_shared<CsvSampleOutputStream>(move(outputStream));
+ output = make_shared<CsvSampleOutputStream>(dict, move(outputStream));
} else if (output_format == "json") {
- output = make_shared<JsonSampleOutputStream>(move(outputStream));
+ output = make_shared<JsonSampleOutputStream>(dict, move(outputStream));
} else if (output_format == "sql") {
if (table_name.size() == 0) {
cerr << "Missing option: table-name" << endl;
return EXIT_FAILURE;
}
- output = make_shared<SqlSampleOutputStream>(move(outputStream), table_name);
+ output = make_shared<SqlSampleOutputStream>(dict, move(outputStream), table_name);
} else {
cerr << "Unsupported output format: " << output_format << endl;
return EXIT_FAILURE;
}
- auto input = make_shared<CsvSampleParser>(output);
+ auto input = make_shared<CsvSampleParser>(dict, output);
char data[100];
while (!inputStream->eof()) {
diff --git a/apps/sample-timestamp.cpp b/apps/sample-timestamp.cpp
index 3a0b3e0..6ac2f86 100644
--- a/apps/sample-timestamp.cpp
+++ b/apps/sample-timestamp.cpp
@@ -14,21 +14,27 @@ namespace po = boost::program_options;
class TimestampFixingSampleOutputStream : public SampleOutputStream {
public:
- TimestampFixingSampleOutputStream(string timestamp_name, string now_name, time_t start_time, shared_ptr<SampleOutputStream> output) :
- timestamp_name_(timestamp_name), now_name_(now_name), start_time_(start_time), output_(output) {
+ TimestampFixingSampleOutputStream(KeyDictionary dict, string timestamp_name, string now_name, time_t start_time, shared_ptr<SampleOutputStream> output) :
+ timestamp_index(dict.indexOf(timestamp_name)), now_index(dict.indexOf(now_name)), start_time_(start_time), output_(output) {
}
- virtual void write(Sample sample) override {
- long relative_time = sample.lexical_at<long>(now_name_);
+ virtual void write(SampleRecord sample) override {
+ o<long> relative_time_o = sample.lexical_at<long>(now_index);
+
+ if (!relative_time_o) {
+ return;
+ }
+
+ long relative_time = relative_time_o.get();
string new_value = std::to_string(start_time_ + relative_time);
- sample.set(timestamp_name_, new_value);
+ sample.set(timestamp_index, new_value);
output_->write(sample);
};
private:
- string now_name_, timestamp_name_;
+ KeyDictionary::index_t now_index, timestamp_index;
time_t start_time_;
shared_ptr<SampleOutputStream> output_;
};
@@ -37,6 +43,7 @@ class sample_timestamp : public app {
private:
string input_file, timestamp_name, now_name;
+ KeyDictionary::index_t now_index;
public:
sample_timestamp() : input_file("") {
@@ -69,19 +76,18 @@ public:
return EXIT_FAILURE;
}
+ KeyDictionary dict;
+
+ now_index = dict.indexOf(now_name);
+
auto sample_buffer = make_shared<VectorSampleOutputStream>();
- unique_ptr<SampleStreamParser> parser = open_sample_input_stream(sample_buffer);
- while (!input.eof()) {
+ unique_ptr<SampleStreamParser> parser = open_sample_input_stream(dict, sample_buffer);
+ while (!input.fail()) {
char buffer[buffer_size];
input.read(buffer, buffer_size);
+ auto count = (size_t) input.gcount();
- if (input.bad()) {
- cerr << "Error reading input" << endl;
- return EXIT_FAILURE;
- }
-
- size_t count = (size_t) input.gcount();
-
+ cerr << "eof? " << input.eof() << endl;
mutable_buffers_1 b = boost::asio::buffer(buffer, count);
parser->process(b);
}
@@ -93,21 +99,20 @@ public:
time_t end_time = buf.st_mtim.tv_sec;
- Sample sample = *--sample_buffer->samples.end();
+ SampleRecord sample = *--sample_buffer->samples.end();
- string s;
- try {
- s = sample.at(now_name);
- } catch (out_of_range &e) {
+ o<string> s = sample.at(now_index);
+ if (!s) {
cerr << "Missing key '" + now_name + "'." << endl;
+ cerr << "keys: " << sample.to_string() << endl;
return EXIT_FAILURE;
}
long now;
try {
- now = boost::lexical_cast<long>(s);
+ now = boost::lexical_cast<long>(s.get());
} catch (const boost::bad_lexical_cast &e) {
- cerr << "Bad integer value '" + s + "'." << endl;
+ cerr << "Bad integer value '" + s.get() + "'." << endl;
return EXIT_FAILURE;
}
@@ -124,16 +129,16 @@ public:
return EXIT_FAILURE;
}
- auto output_stream = open_sample_output_stream(parser->type(), unique_ptr<ostream>(&cout));
- auto p = make_shared<TimestampFixingSampleOutputStream>("timestamp", now_name, start_time, move(output_stream));
- parser = open_sample_input_stream(p, parser->type());
+ auto output_stream = open_sample_output_stream(dict, parser->type(), unique_ptr<ostream>(&cout));
+ auto p = make_shared<TimestampFixingSampleOutputStream>(dict, "timestamp", now_name, start_time, move(output_stream));
+ parser = open_sample_input_stream(dict, p, parser->type());
int recordCount = 0;
while (!input.eof()) {
char buffer[buffer_size];
-
- size_t gcount = (size_t)input.readsome(buffer, buffer_size);
+ input.read(buffer, buffer_size);
+ size_t gcount = (size_t)input.gcount();
recordCount++;
diff --git a/apps/sm-serial-read.cpp b/apps/sm-serial-read.cpp
index 04a718d..c7fb695 100644
--- a/apps/sm-serial-read.cpp
+++ b/apps/sm-serial-read.cpp
@@ -101,6 +101,8 @@ public:
auto desc = execution.desc;
auto vm = execution.vm;
+ KeyDictionary dict;
+
uint32_t baud_rate = 115200;
auto port_name = vm["port"].as<string>();
@@ -124,17 +126,17 @@ public:
unique_ptr<ostream> outputStream = unique_ptr<ostream>(&cout);
if (format == Format::JSON) {
- output = make_shared<JsonSampleOutputStream>(std::move(outputStream));
+ output = make_shared<JsonSampleOutputStream>(dict, std::move(outputStream));
} else if (format == Format::SQL) {
- output = make_shared<SqlSampleOutputStream>(std::move(outputStream), "raw");
+ output = make_shared<SqlSampleOutputStream>(dict, std::move(outputStream), "raw");
} else if (format == Format::PLAIN) {
- output = make_shared<CsvSampleOutputStream>(std::move(outputStream));
+ output = make_shared<CsvSampleOutputStream>(dict, std::move(outputStream));
} else {
cerr << "Unsupported format: " << boost::lexical_cast<string>(format) << endl;
return EXIT_FAILURE;
}
- shared_ptr<CsvSampleParser> input = make_shared<CsvSampleParser>(output);
+ shared_ptr<CsvSampleParser> input = make_shared<CsvSampleParser>(dict, output);
port_handler(port_name, port, input).run();