From 34669098e138d595aadc39fbf8c0cdd004c0916d Mon Sep 17 00:00:00 2001
From: Trygve Laugstøl <trygvis@inamo.no>
Date: Sat, 28 Mar 2015 22:22:16 +0100
Subject: o Adding back SQL output.

---
 apps/sample-convert.cpp            |  16 ++--
 sensor/include/trygvis/sensor.h    |   3 +
 sensor/include/trygvis/sensor/io.h |  86 ++++++++++-------
 sensor/main/io.cpp                 | 186 +++++++++++++++++++------------------
 4 files changed, 165 insertions(+), 126 deletions(-)

diff --git a/apps/sample-convert.cpp b/apps/sample-convert.cpp
index 1cedf42..baa1447 100644
--- a/apps/sample-convert.cpp
+++ b/apps/sample-convert.cpp
@@ -16,9 +16,14 @@ using boost::tokenizer;
 namespace po = boost::program_options;
 
 class sample_convert : public app {
+private:
     string fields;
     string timestamp_field;
     bool add_timestamp;
+    string input_file, output_file;
+    sample_format_type output_format;
+
+    string table_name;
 
 public:
     void add_options(po::options_description_easy_init &options) override {
@@ -74,6 +79,11 @@ public:
 
         options.push_back(&tf);
 
+        table_name_option tno(table_name);
+        if (table_name != "") {
+            options.push_back(&tno);
+        }
+
         tokenizer<> tok(fields);
         output_fields_option fs;
         std::copy(tok.begin(), tok.end(), std::back_inserter(fs.fields));
@@ -100,12 +110,6 @@ public:
 
         return EXIT_SUCCESS;
     }
-
-private:
-    string input_file, output_file;
-    sample_format_type output_format;
-
-    string table_name;
 };
 
 }
diff --git a/sensor/include/trygvis/sensor.h b/sensor/include/trygvis/sensor.h
index 4662bab..f8cfbe5 100644
--- a/sensor/include/trygvis/sensor.h
+++ b/sensor/include/trygvis/sensor.h
@@ -148,6 +148,9 @@ public:
             : dict(dict), values(values) {
     }
 
+    SampleRecord(const SampleRecord &copy) : dict(copy.dict), values(copy.values) {
+    }
+
     inline
     vec::const_iterator cbegin() const {
         return values.cbegin();
diff --git a/sensor/include/trygvis/sensor/io.h b/sensor/include/trygvis/sensor/io.h
index f92b800..f86f2d9 100644
--- a/sensor/include/trygvis/sensor/io.h
+++ b/sensor/include/trygvis/sensor/io.h
@@ -12,29 +12,37 @@ namespace io {
 using namespace std;
 using namespace boost::asio;
 
-class sample_output_stream_option {
-public:
+struct sample_output_stream_option {
     virtual ~sample_output_stream_option() {
     };
 };
 
-class output_fields_option : public sample_output_stream_option {
-public:
+struct output_fields_option : sample_output_stream_option {
     ~output_fields_option() {
     }
 
     vector<string> fields;
 };
 
-class timestamp_field_option : public sample_output_stream_option {
-public:
-    timestamp_field_option(string name) : name(name) {
+struct timestamp_field_option : sample_output_stream_option {
+    timestamp_field_option(const string name) : name(name) {
     }
 
     ~timestamp_field_option() {
     }
 
-    string name;
+    const string name;
+};
+
+class table_name_option : public sample_output_stream_option {
+public:
+    table_name_option(const string name) : name(name) {
+    }
+
+    ~table_name_option() {
+    }
+
+    const string name;
 };
 
 class sample_output_stream_options : public vector<sample_output_stream_option *> {
@@ -56,6 +64,44 @@ public:
     }
 };
 
+struct missing_required_option_error : runtime_error {
+    missing_required_option_error(string what) : runtime_error(what) {
+    }
+
+    ~missing_required_option_error() {
+    }
+};
+
+class SampleStreamParser;
+
+class SampleOutputStream;
+
+/**
+ * Throws missing_required_option_error
+ */
+unique_ptr<SampleStreamParser> open_sample_stream_parser(
+        shared_ptr<SampleOutputStream> output,
+        KeyDictionary &dict,
+        sample_format_type type = sample_format_type::AUTO);
+
+/**
+ * Throws missing_required_option_error
+ */
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+        shared_ptr<ostream> output,
+        KeyDictionary &dict,
+        sample_format_type type,
+        sample_output_stream_options options);
+
+static inline
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+        shared_ptr<ostream> output,
+        KeyDictionary &dict,
+        sample_format_type type) {
+    sample_output_stream_options options;
+    return open_sample_output_stream(output, dict, type, options);
+}
+
 class SampleOutputStream {
 public:
     virtual void write(SampleRecord const &sample) = 0;
@@ -104,10 +150,6 @@ public:
 
     void write(SampleRecord const &sample);
 
-    const KeyDictionary &getDict() {
-        return dict;
-    }
-
 private:
     void writeHeader();
 
@@ -212,26 +254,6 @@ private:
     unique_ptr<KeyValueSampleStreamParser> keyValueParser;
 };
 
-unique_ptr<SampleStreamParser> open_sample_stream_parser(
-        shared_ptr<SampleOutputStream> output,
-        KeyDictionary &dict,
-        sample_format_type type = sample_format_type::AUTO);
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
-        shared_ptr<ostream> output,
-        KeyDictionary &dict,
-        sample_format_type type,
-        sample_output_stream_options options);
-
-static inline
-unique_ptr<SampleOutputStream> open_sample_output_stream(
-        shared_ptr<ostream> output,
-        KeyDictionary &dict,
-        sample_format_type type) {
-    sample_output_stream_options options;
-    return open_sample_output_stream(output, dict, type, options);
-}
-
 static inline
 unique_ptr<ThreadSafeSampleOutputStream> thread_safe_sample_output_stream(unique_ptr<SampleOutputStream> underlying) {
     return make_unique<ThreadSafeSampleOutputStream>(move(underlying));
diff --git a/sensor/main/io.cpp b/sensor/main/io.cpp
index e73b9d4..255b4f5 100644
--- a/sensor/main/io.cpp
+++ b/sensor/main/io.cpp
@@ -1,7 +1,6 @@
 #include "trygvis/sensor/io.h"
 
 #include <map>
-#include <chrono>
 #include "json.hpp"
 #include "boost/tokenizer.hpp"
 #include <boost/algorithm/string.hpp>
@@ -16,6 +15,52 @@ using boost::tokenizer;
 using boost::escaped_list_separator;
 using json = nlohmann::json;
 
+unique_ptr<SampleStreamParser> open_sample_stream_parser(
+        shared_ptr<SampleOutputStream> output,
+        KeyDictionary &dict,
+        sample_format_type type) {
+    if (type == sample_format_type::KEY_VALUE) {
+        return make_unique<KeyValueSampleStreamParser>(output, dict);
+    } else if (type == sample_format_type::AUTO) {
+        return make_unique<AutoSampleParser>(output, dict);
+    } else {
+        throw sample_exception("No parser for format type: " + to_string(type));
+    }
+}
+
+unique_ptr<SampleOutputStream> open_sample_output_stream(
+        shared_ptr<ostream> output,
+        KeyDictionary &dict,
+        sample_format_type type,
+        sample_output_stream_options options) {
+
+    if (type == sample_format_type::CSV) {
+        return make_unique<CsvSampleOutputStream>(output, dict);
+    } else if (type == sample_format_type::KEY_VALUE) {
+        return make_unique<KeyValueSampleOutputStream>(output, dict);
+    } else if (type == sample_format_type::JSON) {
+        return make_unique<JsonSampleOutputStream>(output, dict);
+    } else if (type == sample_format_type::RRD) {
+        auto of = options.find_option<output_fields_option>();
+
+        auto tsf = options.find_option<timestamp_field_option>();
+
+        auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
+
+        return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
+    } else if (type == sample_format_type::SQL) {
+        auto tno = options.find_option<table_name_option>();
+
+        if (!tno.is_initialized()) {
+            throw missing_required_option_error("table name");
+        }
+
+        return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name);
+    } else {
+        throw sample_exception("No writer for format type: " + to_string(type));
+    }
+}
+
 ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
         : underlying(move(underlying)) {
 }
@@ -33,6 +78,11 @@ AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<Sample
 }
 
 void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
+    if (sample.at(timestamp_key)) {
+        underlying_->write(sample);
+        return;
+    }
+
     auto time_since_epoch = system_clock::now().time_since_epoch();
     auto timestamp = duration_cast<seconds>(time_since_epoch).count();
     auto timestamp_s = std::to_string(timestamp);
@@ -264,53 +314,53 @@ SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDict
         dict(dict), stream(move(stream)), table_name(table_name) {
 }
 
-void SqlSampleOutputStream::write(SampleRecord const &values) {
-    throw sample_exception("deimplemented");
-
-//    string fs, vs;
-//
-//    fs.reserve(1024);
-//    vs.reserve(1024);
-//
-//    if (filter_fields) {
-//        auto i = fields.begin();
-//
-//        while (i != fields.end()) {
-//            auto field = *i;
-//
-//            fs += field;
-//
-//            auto value = values.find(field);
-//
-//            if (value != values.end()) {
-//                vs += "'" + value->second + "'";
-//            } else {
-//                vs += "NULL";
-//            }
-//
-//            i++;
-//
-//            if (i != fields.end()) {
-//                fs += ",";
-//                vs += ",";
-//            }
-//        }
-//    } else {
-//        auto i = values.begin();
-//        while (i != values.end()) {
-//            auto v = *i++;
-//
-//            fs += v.first;
-//            vs += "'" + v.second + "'";
-//
-//            if (i != values.end()) {
-//                fs += ",";
-//                vs += ",";
-//            }
-//        }
-//    }
-//
-//    (*stream.get()) << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
+void SqlSampleOutputStream::write(SampleRecord const &sample) {
+    string fs, vs;
+
+    fs.reserve(1024);
+    vs.reserve(1024);
+
+    auto &s = *stream.get();
+
+    bool first = true;
+    if (!dict.empty()) {
+        for (auto &key: dict) {
+            auto sample_key = sample.dict.indexOf(key->name);
+
+            auto value = sample.at(sample_key);
+
+            if (value) {
+                if (first) {
+                    first = false;
+                } else {
+                    fs += ", ";
+                    vs += ", ";
+                }
+                fs += key->name;
+                vs += value.get();
+            }
+        }
+    } else {
+        for (auto &sample_key: sample.dict) {
+            auto o = sample.at(sample_key);
+
+            if (o) {
+                if (first) {
+                    first = false;
+                } else {
+                    fs += ", ";
+                    vs += ", ";
+                }
+                // Make sure that the key is registered in the dictionary
+                dict.indexOf(sample_key->name);
+
+                fs += sample_key->name;
+                vs += o.get();
+            }
+        }
+    }
+
+    s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
 }
 
 void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {
@@ -380,46 +430,6 @@ void AutoSampleParser::process(mutable_buffers_1 &buffer) {
     }
 }
 
-unique_ptr<SampleStreamParser> open_sample_stream_parser(
-        shared_ptr<SampleOutputStream> output,
-        KeyDictionary &dict,
-        sample_format_type type) {
-    if (type == sample_format_type::KEY_VALUE) {
-        return make_unique<KeyValueSampleStreamParser>(output, dict);
-    } else if (type == sample_format_type::AUTO) {
-        return make_unique<AutoSampleParser>(output, dict);
-    } else {
-        throw sample_exception("No parser for format type: " + to_string(type));
-    }
-}
-
-unique_ptr<SampleOutputStream> open_sample_output_stream(
-        shared_ptr<ostream> output,
-        KeyDictionary &dict,
-        sample_format_type type,
-        sample_output_stream_options options) {
-
-    if (type == sample_format_type::CSV) {
-        return make_unique<CsvSampleOutputStream>(output, dict);
-    } else if (type == sample_format_type::KEY_VALUE) {
-        return make_unique<KeyValueSampleOutputStream>(output, dict);
-    } else if (type == sample_format_type::JSON) {
-        return make_unique<JsonSampleOutputStream>(output, dict);
-    } else if (type == sample_format_type::RRD) {
-        o<output_fields_option *> of = options.find_option<output_fields_option>();
-
-        o<timestamp_field_option *> tsf = options.find_option<timestamp_field_option>();
-
-        auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");
-
-        return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
-//    } else if (type == sample_format_type::SQL) {
-//        return make_unique<SqlSampleOutputStream>(dict, move(output), table_name);
-    } else {
-        throw sample_exception("No writer for format type: " + to_string(type));
-    }
-}
-
 }
 }
 }
\ No newline at end of file
-- 
cgit v1.2.3