#include "trygvis/sensor/io.h"

#include <map>
#include "json.hpp"
#include "boost/tokenizer.hpp"
#include <boost/algorithm/string.hpp>

namespace trygvis {
namespace sensor {
namespace io {

using namespace std;
using namespace std::chrono;
using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;

unique_ptr<SampleStreamParser> open_sample_stream_parser(
        shared_ptr<SampleOutputStream> output,
        KeyDictionary &dict,
        sample_format_type type) {
    if (type == sample_format_type::KEY_VALUE) {
        return make_unique<KeyValueSampleStreamParser>(output, dict);
    } else if (type == sample_format_type::AUTO) {
        return make_unique<AutoSampleParser>(output, dict);
    } else {
        throw sample_exception("No parser for format type: " + to_string(type));
    }
}

unique_ptr<SampleOutputStream> open_sample_output_stream(
        shared_ptr<ostream> output,
        KeyDictionary &dict,
        sample_format_type type,
        sample_output_stream_options options) {

    if (type == sample_format_type::CSV) {
        return make_unique<CsvSampleOutputStream>(output, dict);
    } else if (type == sample_format_type::KEY_VALUE) {
        return make_unique<KeyValueSampleOutputStream>(output, dict);
    } else if (type == sample_format_type::JSON) {
        return make_unique<JsonSampleOutputStream>(output, dict);
    } else if (type == sample_format_type::RRD) {
        auto of = options.find_option<output_fields_option>();

        auto tsf = options.find_option<timestamp_field_option>();

        auto timestamp_key = dict.indexOf(tsf ? tsf.get()->name : "timestamp");

        return make_unique<RrdSampleOutputStream>(output, dict, timestamp_key, of);
    } else if (type == sample_format_type::SQL) {
        auto tno = options.find_option<table_name_option>();

        if (!tno.is_initialized()) {
            throw missing_required_option_error("table name");
        }

        return make_unique<SqlSampleOutputStream>(move(output), dict, tno.get()->name);
    } else {
        throw sample_exception("No writer for format type: " + to_string(type));
    }
}

ThreadSafeSampleOutputStream::ThreadSafeSampleOutputStream(unique_ptr<SampleOutputStream> underlying)
        : underlying(move(underlying)) {
}

void ThreadSafeSampleOutputStream::write(SampleRecord const &sample) {
    std::unique_lock<std::mutex> lock(mutex);

    underlying->write(sample);
}


AddTimestampSampleOutputStream::AddTimestampSampleOutputStream(unique_ptr<SampleOutputStream> underlying,
        KeyDictionary &dict,
        const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
}

void AddTimestampSampleOutputStream::write(SampleRecord const &sample) {
    if (sample.at(timestamp_key)) {
        underlying_->write(sample);
        return;
    }

    auto time_since_epoch = system_clock::now().time_since_epoch();
    auto timestamp = duration_cast<seconds>(time_since_epoch).count();
    auto timestamp_s = std::to_string(timestamp);

    SampleRecord copy = sample;
    copy.set(timestamp_key, timestamp_s);
    underlying_->write(copy);
}

void VectorSampleOutputStream::write(SampleRecord const &sample) {
    if (sample.empty()) {
        return;
    }

    samples.emplace_back(sample);
}

CsvSampleOutputStream::CsvSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict)
        : stream(move(stream)), headerWritten(false), dict(dict) {
}

void CsvSampleOutputStream::write(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    // Build the dict with the keys from the first sample.
    if (dict.empty()) {
        SampleKeyIndex index = 0;
        auto ptr = sample.cbegin();
        while (ptr != sample.cend()) {
            auto o = *ptr;

            if (o) {
                auto name = sample.dict.at(index)->name;
                dict.indexOf(name);
            }

            ptr++;
            index++;
        }
    }

    if (!headerWritten) {
        writeHeader();
        headerWritten = true;
    }

    auto &s = *stream.get();

    auto it = dict.begin();
    while (it != dict.end()) {
        if (it != dict.begin()) {
            s << ",";
        }

        auto key = *it++;
        auto sampleKey = sample.dict.indexOf(key->name);
        auto o = sample.at(sampleKey);

        if (o) {
            s << o.get();
        }
    }

    s << endl << flush;
}

void CsvSampleOutputStream::writeHeader() {
    auto &s = *stream.get();

    auto i = dict.begin();
    while (i != dict.end()) {
        s << (*i)->name;

        i++;

        if (i != dict.end()) {
            s << ",";
        }
    }

    s << endl << flush;
}

JsonSampleOutputStream::JsonSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
        dict(dict), stream(move(stream)) {
}

void JsonSampleOutputStream::write(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    json doc({});

    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sampleKey = sample.dict.indexOf(key->name);

            auto value = sample.at(sampleKey);

            if (value) {
                doc[key->name] = value.get();
            }
        }
    } else {
        for (auto &sampleKey: sample.dict) {
            auto o = sample.at(sampleKey);

            if (o) {
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sampleKey->name);
                doc[sampleKey->name] = o.get();
            }
        }
    }

    *stream.get() << doc << endl << flush;
}

KeyValueSampleOutputStream::KeyValueSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict) :
        dict(dict), stream(move(stream)) {
}

void KeyValueSampleOutputStream::write(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    auto &s = *stream.get();

    bool first = true;
    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sampleKey = sample.dict.indexOf(key->name);

            auto value = sample.at(sampleKey);

            if (value) {
                if (first) {
                    first = false;
                } else {
                    s << ", ";
                }
                s << key->name << "=" << value.get();
            }
        }
    } else {
        for (auto &sampleKey: sample.dict) {
            auto o = sample.at(sampleKey);

            if (o) {
                if (first) {
                    first = false;
                } else {
                    s << ", ";
                }
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sampleKey->name);
                s << sampleKey->name << "=" << o.get();
            }
        }
    }

    s << endl << flush;
}

RrdSampleOutputStream::RrdSampleOutputStream(shared_ptr<ostream> stream,
        KeyDictionary &dict,
        const SampleKey *timestamp_key,
        o<output_fields_option *> output_fields) :
        stream(move(stream)), timestamp_key(timestamp_key) {

    if (output_fields) {
        for (auto field : output_fields.get()->fields) {
            keys.emplace_back(dict.indexOf(field));
        }
    } else {
        for (auto key : dict) {
            keys.emplace_back(key);
        }
    }
}

void RrdSampleOutputStream::write(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    auto &s = *stream.get();

    auto timestampO = sample.at(timestamp_key);

    if (!timestampO) {
        return;
    }

    auto timestamp = timestampO.get();

    s << timestamp;

    bool first = true;
    for (auto &key: keys) {
        if (key == timestamp_key) {
            continue;
        }

        auto value = sample.at(key);

        if (first) {
            s << "@";
            first = false;
        } else {
            s << ":";
        }

        s << (value ? value.get() : "U");
    }

    s << endl << flush;
}

SqlSampleOutputStream::SqlSampleOutputStream(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
        dict(dict), stream(move(stream)), table_name(table_name) {
}

void SqlSampleOutputStream::write(SampleRecord const &sample) {
    string fs, vs;

    fs.reserve(1024);
    vs.reserve(1024);

    auto &s = *stream.get();

    bool first = true;
    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sample_key = sample.dict.indexOf(key->name);

            auto value = sample.at(sample_key);

            if (value) {
                if (first) {
                    first = false;
                } else {
                    fs += ", ";
                    vs += ", ";
                }
                fs += "\"" + key->name + "\"";
                vs += "'" + value.get() + "'";
            }
        }
    } else {
        for (auto &sample_key: sample.dict) {
            auto o = sample.at(sample_key);

            if (o) {
                if (first) {
                    first = false;
                } else {
                    fs += ", ";
                    vs += ", ";
                }
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sample_key->name);

                fs += "\"" + sample_key->name + "\"";
                vs += "'" + o.get() + "'";
            }
        }
    }

    s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}

void KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {

    size_t size = buffer_size(buffer);

    if (size == 0 && line->size()) {
        process_line(line);
        line = make_shared<vector<uint8_t>>();
        return;
    }

    auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);

    for (int i = 0; i < size; i++) {
        uint8_t b = data[i];

        if (b == packet_delimiter) {
            process_line(line);
            line = make_shared<vector<uint8_t>>();
        } else {
            line->push_back(b);
        }
    }

}

void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> packet) {
    auto s = std::string((char *) packet->data(), packet->size());

    typedef tokenizer<escaped_list_separator<char>> Tokenizer;
    Tokenizer tokens(s);

    SampleRecord sample(dict_);

    for (auto token : tokens) {
        auto index = token.find('=');

        if (index == string::npos) {
            continue;
        }

        auto name = token.substr(0, index);
        boost::algorithm::trim(name);
        auto value = token.substr(index + 1);
        boost::algorithm::trim(value);

        auto key = dict_.indexOf(name);
        sample.set(key, value);
    }

    output->write(sample);
}

AutoSampleParser::AutoSampleParser(shared_ptr<SampleOutputStream> output, KeyDictionary &dict) :
        SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
    // Directly select the parser now until we have more than one parser
    parser = std::move(keyValueParser);
    type_ = sample_format_type::KEY_VALUE;
}

void AutoSampleParser::process(mutable_buffers_1 &buffer) {
    if (parser) {
        parser->process(buffer);
    } else {
        throw runtime_error("Not implemented yet");
    }
}

}
}
}