#include "trygvis/sensor/io.h"

#include <map>
#include <json.hpp>
#include <boost/tokenizer.hpp>
#include <boost/algorithm/string.hpp>

namespace trygvis {
namespace sensor {
namespace io {

using namespace std;
using namespace std::chrono;
using boost::tokenizer;
using boost::escaped_list_separator;
using json = nlohmann::json;

unique_ptr<SampleStreamParser> open_sample_stream_parser(
        shared_ptr<SampleConsumer> output,
        KeyDictionary &dict,
        sample_format_type type) {
    if (type == sample_format_type::KEY_VALUE) {
        return make_unique<KeyValueSampleStreamParser>(output, dict);
    } else if (type == sample_format_type::AUTO) {
        return make_unique<AutoSampleParser>(output, dict);
    } else {
        throw sample_exception("No parser for format type: " + to_string(type));
    }
}

unique_ptr<SampleConsumer> open_sample_writer(
        shared_ptr<ostream> output,
        KeyDictionary &dict,
        sample_format_type type,
        sample_output_stream_options options) {

    if (type == sample_format_type::CSV) {
        return make_unique<CsvWriterSampleConsumer>(output, dict);
    } else if (type == sample_format_type::KEY_VALUE) {
        return make_unique<KeyValueWriterSampleConsumer>(output, dict);
    } else if (type == sample_format_type::JSON) {
        return make_unique<JsonWriterSampleConsumer>(output, dict);
    } else if (type == sample_format_type::RRD) {
        auto of = options.find_option<output_fields_option>();

        auto tsf = options.find_option<timestamp_field_option>();

        auto timestamp_key = dict.indexOf(tsf ? tsf.value()->name : "timestamp");

        return make_unique<RrdWriterSampleConsumer>(output, dict, timestamp_key, of);
    } else if (type == sample_format_type::SQL) {
        auto tno = options.find_option<table_name_option>();

        if (!tno) {
            throw missing_required_option_error("table name");
        }

        return make_unique<SqlWriterSampleConsumer>(move(output), dict, tno.value()->name);
    } else {
        throw sample_exception("No writer for format type: " + to_string(type));
    }
}

ThreadSafeSampleConsumer::ThreadSafeSampleConsumer(unique_ptr<SampleConsumer> underlying)
        : underlying(move(underlying)) {
}

void ThreadSafeSampleConsumer::onSample(SampleRecord const &sample) {
    std::unique_lock<std::mutex> lock(mutex);

    underlying->onSample(sample);
}

AddTimestampSampleConsumer::AddTimestampSampleConsumer(unique_ptr<SampleConsumer> underlying,
        KeyDictionary &dict,
        const string &timestamp_name) : underlying_(move(underlying)), timestamp_key(dict.indexOf(timestamp_name)) {
}

void AddTimestampSampleConsumer::onSample(SampleRecord const &sample) {
    if (sample.at(timestamp_key)) {
        underlying_->onSample(sample);
        return;
    }

    auto time_since_epoch = system_clock::now().time_since_epoch();
    auto timestamp = duration_cast<seconds>(time_since_epoch).count();
    auto timestamp_s = std::to_string(timestamp);

    SampleRecord copy = sample;
    copy.set(timestamp_key, timestamp_s);
    underlying_->onSample(copy);
}

void VectorSampleOutputStream::onSample(SampleRecord const &sample) {
    if (sample.empty()) {
        return;
    }

    samples.emplace_back(sample);
}

CsvWriterSampleConsumer::CsvWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict)
        : stream(move(stream)), headerWritten(false), dict(dict) {
}

void CsvWriterSampleConsumer::onSample(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    // Build the dict with the keys from the first sample.
    if (dict.empty()) {
        SampleKeyIndex index = 0;
        auto ptr = sample.cbegin();
        while (ptr != sample.cend()) {
            auto o = *ptr;

            if (o) {
                auto name = sample.dict.at(index)->name;
                dict.indexOf(name);
            }

            ptr++;
            index++;
        }
    }

    if (!headerWritten) {
        writeHeader();
        headerWritten = true;
    }

    auto &s = *stream.get();

    auto it = dict.begin();
    while (it != dict.end()) {
        if (it != dict.begin()) {
            s << ",";
        }

        auto key = *it++;
        auto sampleKey = sample.dict.indexOf(key->name);
        auto o = sample.at(sampleKey);

        if (o) {
            s << o.value();
        }
    }

    s << endl << flush;
}

void CsvWriterSampleConsumer::writeHeader() {
    auto &s = *stream;

    auto i = dict.begin();
    while (i != dict.end()) {
        s << (*i)->name;

        i++;

        if (i != dict.end()) {
            s << ",";
        }
    }

    s << endl << flush;
}

JsonWriterSampleConsumer::JsonWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
        dict(dict), stream(move(stream)) {
}

void JsonWriterSampleConsumer::onSample(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    json doc({});

    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sampleKey = sample.dict.indexOf(key->name);

            auto value = sample.at(sampleKey);

            if (value) {
                doc[key->name] = value.value();
            }
        }
    } else {
        for (auto &sampleKey: sample.dict) {
            auto o = sample.at(sampleKey);

            if (o) {
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sampleKey->name);
                doc[sampleKey->name] = o.value();
            }
        }
    }

    *stream.get() << doc << endl << flush;
}

KeyValueWriterSampleConsumer::KeyValueWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict) :
        dict(dict), stream(move(stream)) {
}

void KeyValueWriterSampleConsumer::onSample(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    auto s = stream.get();

    bool first = true;
    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sampleKey = sample.dict.indexOf(key->name);

            auto value = sample.at(sampleKey);

            if (value) {
                if (first) {
                    first = false;
                } else {
                    *s << ", ";
                }
                *s << key->name << "=" << value.value();
            }
        }
    } else {
        for (auto &sampleKey: sample.dict) {
            auto o = sample.at(sampleKey);

            if (o) {
                if (first) {
                    first = false;
                } else {
                    *s << ", ";
                }
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sampleKey->name);
                *s << sampleKey->name << "=" << o.value();
            }
        }
    }

    *s << endl << flush;
}

RrdWriterSampleConsumer::RrdWriterSampleConsumer(shared_ptr<ostream> stream,
        KeyDictionary &dict,
        const SampleKey *timestamp_key,
        o<output_fields_option *> output_fields) :
        stream(move(stream)), timestamp_key(timestamp_key) {

    if (output_fields) {
        for (auto field : output_fields.value()->fields) {
            keys.emplace_back(dict.indexOf(field));
        }
    } else {
        for (auto key : dict) {
            keys.emplace_back(key);
        }
    }
}

void RrdWriterSampleConsumer::onSample(SampleRecord const &sample) {
    // Skip empty records
    if (sample.empty()) {
        return;
    }

    auto &s = *stream.get();

    auto timestampO = sample.at(timestamp_key);

    if (!timestampO) {
        return;
    }

    auto timestamp = timestampO.value();

    s << timestamp;

    bool first = true;
    for (auto &key: keys) {
        if (key == timestamp_key) {
            continue;
        }

        auto value = sample.at(key);

        if (first) {
            s << "@";
            first = false;
        } else {
            s << ":";
        }

        s << (value ? value.value() : "U");
    }

    s << endl << flush;
}

SqlWriterSampleConsumer::SqlWriterSampleConsumer(shared_ptr<ostream> stream, KeyDictionary &dict, string table_name) :
        dict(dict), stream(move(stream)), table_name(table_name) {
}

void SqlWriterSampleConsumer::onSample(SampleRecord const &sample) {
    string fs, vs;

    fs.reserve(1024);
    vs.reserve(1024);

    auto &s = *stream;

    bool first = true;
    if (!dict.empty()) {
        for (auto &key: dict) {
            auto sample_key = sample.dict.indexOf(key->name);

            auto value = sample.at(sample_key);

            if (value) {
                if (first) {
                    first = false;
                } else {
                    fs += ", ";
                    vs += ", ";
                }
                fs += "\"" + key->name + "\"";
                vs += "'" + value.value() + "'";
            }
        }
    } else {
        for (auto &sample_key: sample.dict) {
            auto o = sample.at(sample_key);

            if (o) {
                if (first) {
                    first = false;
                } else {
                    fs += ", ";
                    vs += ", ";
                }
                // Make sure that the key is registered in the dictionary
                dict.indexOf(sample_key->name);

                fs += "\"" + sample_key->name + "\"";
                vs += "'" + o.value() + "'";
            }
        }
    }

    s << "INSERT INTO " << table_name << "(" << fs << ") VALUES(" << vs << ");" << endl << flush;
}

int KeyValueSampleStreamParser::process(mutable_buffers_1 &buffer) {

    size_t size = buffer_size(buffer);

    if (size == 0) {
        return 0;
    }

    auto data = boost::asio::buffer_cast<const uint8_t *>(buffer);

    int count = 0;
    for (int i = 0; i < size; i++) {
        uint8_t b = data[i];

        if (b == '\0') {
            continue;
        } else if (b == packet_delimiter) {
            process_line(line);
            count++;
            line = make_shared<vector<uint8_t>>();
        } else {
            line->push_back(b);
        }
    }

    return count;
}

int KeyValueSampleStreamParser::finish() {
    if (line->size()) {
        process_line(line);

        return 1;
    }

    return 0;
}

void KeyValueSampleStreamParser::process_line(shared_ptr<vector<uint8_t>> &packet) {
    auto s = std::string((char *) packet->data(), packet->size());
//    boost::algorithm::erase_all(s, "\0");

    typedef tokenizer<escaped_list_separator<char>> Tokenizer;
    Tokenizer tokens(s);

    SampleRecord sample(dict_);

    for (auto token : tokens) {
        auto index = token.find('=');

        if (index == string::npos) {
            continue;
        }

        auto name = token.substr(0, index);
        boost::algorithm::trim(name);
        auto value = token.substr(index + 1);
        boost::algorithm::trim(value);

        auto key = dict_.indexOf(name);
        sample.set(key, value);
    }

    output->onSample(sample);
}

AutoSampleParser::AutoSampleParser(shared_ptr<SampleConsumer> output, KeyDictionary &dict) :
        SampleStreamParser(sample_format_type::AUTO, dict), keyValueParser(new KeyValueSampleStreamParser(output, dict)) {
    // Directly select the parser now until we have more than one parser
    parser = std::move(keyValueParser);
    type_ = sample_format_type::KEY_VALUE;
}

int AutoSampleParser::process(mutable_buffers_1 &buffer) {
    if (parser) {
        return parser->process(buffer);
    } else {
        throw runtime_error("Not implemented yet");
    }
}

int AutoSampleParser::finish() {
    if (parser) {
        return parser->finish();
    } else {
        throw runtime_error("Not implemented yet");
    }
}

}
}
}