#ifndef TRYGVIS_CASSANDRA_SUPPORT_H
#define TRYGVIS_CASSANDRA_SUPPORT_H

#include <cassandra.h>

#include <string>
#include <cassert>
#include <algorithm>
#include <stdexcept>
#include <functional>
#include <vector>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <mutex>

namespace trygvis {
namespace cassandra_support {

using namespace std;

class cassandra_error : public runtime_error {
public:
    cassandra_error(const string &context, const string &error) : runtime_error(
            "Cassandra error: context=" + context + ", error=" + error) {
    }

    cassandra_error(const string &context, CassError error) : runtime_error(
            "Cassandra error: context=" + context + ", error=" + cass_error_desc(error)) {
    }
};

static CassError execute_query(CassSession *session, const string query) {
    CassStatement *statement = cass_statement_new(query.c_str(), 0);

    CassFuture *future = cass_session_execute(session, statement);
    cass_future_wait(future);

    CassError rc = cass_future_error_code(future);

    cass_future_free(future);
    cass_statement_free(statement);

    return rc;
}

void assert_ok(const string &context, CassError &err) {
    if (err == CASS_OK) {
        return;
    }

    throw cassandra_error(context, err);
}

template<typename Underlying>
class cassandra_wrapper {
public:
    inline
    operator const Underlying *() const {
        return underlying_;
    }

    inline
    Underlying *underlying() const {
        return underlying_;
    }

protected:
    cassandra_wrapper(Underlying *underlying) : underlying_(underlying) {
    }

    cassandra_wrapper(cassandra_wrapper &&other) {
        underlying_ = other.underlying_;
        other.underlying_ = nullptr;
    }

    cassandra_wrapper(const cassandra_wrapper &) = delete;

    cassandra_wrapper &operator=(const cassandra_wrapper &) = delete;

    virtual ~cassandra_wrapper() {
        underlying_ = reinterpret_cast<Underlying *>(0xaabbccdd);
    }

    Underlying *underlying_;
};

class cassandra_result : public cassandra_wrapper<const CassResult> {
public:
    cassandra_result(const CassResult *result) : cassandra_wrapper(result) {
        if (result == NULL) {
            throw cassandra_error("cassandra_result()", "result is NULL");
        }
    }

    cassandra_result(cassandra_result &&other) = default;

    cassandra_result(const cassandra_result &) = delete;

    ~cassandra_result() {
        cass_result_free(underlying_);
    }

    cassandra_result &operator=(const cassandra_result &) = delete;
};

class cassandra_future2 {
public:
    // TODO: the values shouldn't be the future, but a specific result object instead
    typedef void(callback_t)(cassandra_future2 &);

    static
    cassandra_future2 *wrap(CassFuture *future) {
        return new cassandra_future2(future);
    }

private:
    typedef std::lock_guard<std::mutex> guard;

    callback_t *callback_ = nullptr;
    CassFuture *future;
    bool has_callback = false;
    bool has_data = false;

    std::mutex mutex;

    cassandra_future2(CassFuture *future) : future(future), callback_(default_callback) {
        cout << "cassandra_future2: this=" << std::hex << std::setw(8) << this <<
        ", future=" << std::hex << std::setw(8) << future << endl;
        cass_future_set_callback(future, callback, this);
    }

    static void default_callback(cassandra_future2 &) {
        cout << "default_callback" << endl;
    }

    ~cassandra_future2() {
        cout << "~cassandra_future2: this=" << std::hex << std::setw(8) << this << endl;
//        cout << std::hex << std::setw(8) << "freeing future: " << underlying_ << endl;
//        cass_future_free(underlying_);
    }

    cassandra_future2(const cassandra_future2 &) = delete;

    cassandra_future2 &operator=(const cassandra_future2 &) = delete;

public:
    void then(callback_t callback_) {
        bool do_delete = false;

        {
            guard lock(mutex);

            assert(callback_ != nullptr);
            this->callback_ = callback_;
            has_callback = true;

            if (has_data) {
                cout << "Had early data" << endl;
                this->callback(future);

                cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
                cass_future_free(future);
                do_delete = true;
            }
        }

        if (do_delete) {
            delete this;
        }
    }

    bool fetched = false;

    cassandra_result result() {
        if (fetched) {
            throw cassandra_error("cassandra_result::result()", "Already fetched");
        }
        fetched = true;
        const CassResult *x = cass_future_get_result(future);
        size_t count = cass_result_row_count(x);
        return std::move(cassandra_result(x));
    }

    static
    string error_message(CassFuture * future) {
        const char *message;
        size_t message_length;
        cass_future_error_message(future, &message, &message_length);
        return string(message, message_length);
    }

    string error_message() {
        return error_message(future);
    }

    CassError error_code() const {
        return cass_future_error_code(future);
    }

    bool operator!() const {
        return !ok();
    }

    operator bool() const {
        return ok();
    }

    bool ok() const {
        return error_code() == CASS_OK;
    }

private:

    static void callback(CassFuture *f, void *data) {
        cassandra_future2 *c_f = static_cast<cassandra_future2 *>(data);
        c_f->callback(f);
    }

    void callback(CassFuture *future) {
        bool do_delete = false;

        {
            guard lock(mutex);

            cout << "cassandra_future::callback, error=" << cassandra_future2::error_message(future) << endl;
            cout << "cassandra_future::callback, this=" << std::hex << std::setw(8) << (this) << endl;
            CassError rc = cass_future_error_code(future);

            has_data = true;
            if (has_callback) {
                cout << "had callback already" << endl;
                callback_(*this);

                cout << "freeing future: " << std::hex << std::setw(8) << future << endl;
                cass_future_free(future);

                do_delete = true;
            }
        }

        if (do_delete) {
            delete this;
        }
    }
};

class cassandra_future : public cassandra_wrapper<CassFuture> {
public:
    typedef std::function<void(cassandra_future &)> callback_type;

    struct tmp {
        cassandra_future::callback_type cb;
    };

    cassandra_future(CassFuture *future) : cassandra_wrapper(future) {
    }

    ~cassandra_future() {
        cass_future_free(underlying_);
    }

    cassandra_future(const cassandra_future &) = delete;

    cassandra_future &operator=(const cassandra_future &) = delete;

    cassandra_result result() {
        if (fetched) {
            throw cassandra_error("cassandra_result::result()", "Already fetched");
        }
        fetched = true;
        const CassResult *x = cass_future_get_result(underlying());
        size_t count = cass_result_row_count(x);
        return std::move(cassandra_result(x));
    }

    static
    string error_message(CassFuture *future) {
        return cassandra_future2::error_message(future);
    }

    string error_message() {
        return error_message(underlying());
    }

    CassError error_code() const {
        return cass_future_error_code(underlying());
    }

    bool operator!() const {
        return error_code() != CASS_OK;
    }

    bool ok() const {
        return error_code() == CASS_OK;
    }

    static void callback(CassFuture *f, void *data) {
        tmp *tmp_ = static_cast<tmp *>(data);
        auto cb = tmp_->cb;
        delete tmp_;
        cassandra_future c_f(f);
        cb(c_f);
    }

private:
    bool fetched = false;
};

class cassandra_tuple {
public:
    cassandra_tuple(size_t item_count) {
        tuple = cass_tuple_new(item_count);
    }

    ~cassandra_tuple() {
        cass_tuple_free(tuple);
    }

    cassandra_tuple(const cassandra_tuple &) = delete;

    cassandra_tuple &operator=(const cassandra_tuple &) = delete;

    void set(size_t i, const cass_int32_t value) {
        cass_tuple_set_int32(tuple, i, value);
    }

    void set(size_t i, const string &s) {
        cass_tuple_set_string_n(tuple, i, s.c_str(), s.length());
    }

    CassTuple *tuple;
};

class cassandra_collection {
public:
    cassandra_collection(CassCollectionType type, size_t item_count) {
        collection = cass_collection_new(type, item_count);
    }

    ~cassandra_collection() {
        cass_collection_free(collection);
    }

    cassandra_collection(const cassandra_collection &) = delete;

    cassandra_collection &operator=(const cassandra_collection &) = delete;

    void append(const cassandra_tuple &&tuple) {
        cass_collection_append_tuple(collection, tuple.tuple);
    }

    void append(const string &&value) {
        cass_collection_append_string(collection, value.c_str());
    }

    void append(const string &value) {
        cass_collection_append_string(collection, value.c_str());
    }

    operator CassCollection *() const {
        return collection;
    };

private:
    CassCollection *collection;
};

void handle_future(CassFuture *future, std::function<void(CassFuture *)> success,
                   std::function<void(CassFuture *, CassError)> error) {
    cass_future_wait(future);

    CassError rc = cass_future_error_code(future);

    if (rc == CASS_OK) {
        success(future);
    } else {
        error(future, rc);
    }

    cass_future_free(future);
};

class cassandra_statement {
public:
    cassandra_statement(string q, size_t argument_count) {
        statement = cass_statement_new(q.c_str(), argument_count);
    };

    ~cassandra_statement() {
        cass_statement_free(statement);
    }

    cassandra_statement(const cassandra_statement &) = delete;

    cassandra_statement &operator=(const cassandra_statement &) = delete;

    void bind(size_t i, const string &value) {
        auto err = cass_statement_bind_string(statement, i, value.c_str());
        assert_ok("cass_statement_bind_string", err);
    }

    void bind(size_t i, const char *value) {
        auto err = cass_statement_bind_string(statement, i, value);
        assert_ok("cass_statement_bind_string", err);
    }

    void bind(size_t i, const cass_int64_t value) {
        auto err = cass_statement_bind_int64(statement, i, value);
        assert_ok("cass_statement_bind_int64", err);
    }

    void bind(size_t i, const CassCollection *value) {
        auto err = cass_statement_bind_collection(statement, i, value);
        assert_ok("cass_statement_bind_collection", err);
    }

    void bind(size_t i, const cassandra_collection &value) {
        auto err = cass_statement_bind_collection(statement, i, value);
        assert_ok("cass_statement_bind_collection", err);
    }

    void bind(size_t i, const std::vector<string> &&values) {
        cassandra_collection c(CassCollectionType::CASS_COLLECTION_TYPE_LIST, values.size());

        for (const auto &value : values) {
            c.append(value);
        }

        auto err = cass_statement_bind_collection(statement, i, c);
        assert_ok("cass_statement_bind_collection", err);
    }

    operator CassStatement *() const {
        return statement;
    }

    CassStatement *underlying() const {
        return statement;
    }

private:
    CassStatement *statement;
};

class cassandra_session {
public:
    cassandra_session() {
        session = cass_session_new();
    }

    ~cassandra_session() {
        cass_session_free(session);
    }

    cassandra_session(const cassandra_session &) = delete;

    cassandra_session &operator=(const cassandra_session &) = delete;

    auto connect(CassCluster *cluster) {
        return cass_session_connect(session, cluster);
    }

    [[deprecated]]
    void execute(cassandra_statement &&stmt, cassandra_future::callback_type cb_) {
        auto future = cass_session_execute(session, stmt.underlying());
        cass_future_set_callback(future, cassandra_future::callback, new cassandra_future::tmp{cb_});
    }

    cassandra_future2 *execute2(cassandra_statement &&stmt) {
        auto future = cass_session_execute(session, stmt.underlying());
        return cassandra_future2::wrap(future);
    }

    operator CassSession *() const {
        return session;
    }

    CassSession *underlying() const {
        return session;
    }

private:
    CassSession *session;
};

class cassandra_logging {
public:
    cassandra_logging(CassLogLevel log_level = CASS_LOG_DEBUG) {
        cass_log_set_level(log_level);
        cass_log_set_callback(on_log, this);
    }

    ~cassandra_logging() {
    }

private:
    static
    void on_log(const CassLogMessage *message, void *data) {
        stringstream buf;
        buf << message->time_ms << " " << cass_log_level_string(message->severity) << " " <<
        message->file << ":" << message->function << ":" <<
        message->line << ":" << message->message;

        cout << "CASSANDRA: " << buf.str() << endl;
    }
};

}
}

#endif