znone
2019-06-16 ebd997c5ce55fd260696f9bb117f1f06c09a4759
通过 MariaDB 的非阻塞函数访问 MySQL。
4 files added
10 files modified
2692 ■■■■■ changed files
README.md 52 ●●●●● patch | view | raw | blame | history
include/qtl_async.hpp 421 ●●●●● patch | view | raw | blame | history
include/qtl_common.hpp 16 ●●●●● patch | view | raw | blame | history
include/qtl_database_pool.hpp 165 ●●●●● patch | view | raw | blame | history
include/qtl_mysql.hpp 1300 ●●●● patch | view | raw | blame | history
include/qtl_mysql_pool.hpp 43 ●●●●● patch | view | raw | blame | history
include/qtl_odbc.hpp 2 ●●● patch | view | raw | blame | history
test/TestMariaDB.cpp 421 ●●●●● patch | view | raw | blame | history
test/TestMysql.cpp 2 ●●● patch | view | raw | blame | history
test/test_mariadb.mak 22 ●●●●● patch | view | raw | blame | history
test/test_mysql.mak 4 ●●●● patch | view | raw | blame | history
test/test_sqlite.mak 13 ●●●● patch | view | raw | blame | history
test/vs/test.sln 6 ●●●●● patch | view | raw | blame | history
test/vs/test_mariadb/test_mariadb.vcproj 225 ●●●●● patch | view | raw | blame | history
README.md
@@ -172,6 +172,58 @@
```
#### 12. 异步调用数据库
通过类async_connection可以异步调用数据库。所有的异步函数都需要提供一个回调函数接受操作完成后的结果。如果异步调用中发生错误,错误做为回调函数的参数返回给调用者。
```
qtl::mysql::async_connection connection;
connection.open(ev, [&connection](const qtl::mysql::error& e) {
    ...
});
```
异步调用在事件循环中完成。ev是事件循环对象。QTL只提出它对事件循环的需求,并不实现事件循环。QTL要求事件循环提供如下接口,该接口由用户代码实现:
```
class EventLoop
{
public:
    // 把数据库连接添加到事件循环中
    template<typename Connection>
    qtl::event_handler* add(Connection* connection);
    // 在事件循环中添加一个超时任务
    template<typename Handler>
    qtl::event* set_timeout(const timeval& timeout, Handler&& handler);
};
```
qtl::event是QTL中定义的一个事件项接口,用户代码同样应该实现它:
```
struct event
{
    // IO事件标志
    enum io_flags
    {
        ef_read = 0x1,
        ef_write = 0x2,
        ef_exception = 0x4,
        ef_timeout =0x8,
        ev_all = ef_read | ef_write | ef_exception
    };
    virtual ~event() { }
    // 设置IO处理器
    virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&&) = 0;
    // 从事件循环中移除事件项
    virtual void remove() = 0;
    // 判断该事件项是否在等待IO中
    virtual bool is_busying() = 0;
};
```
数据库连接通常不是线程安全的。用户代码应该保证,一个连接只能同时由一个线程使用。
## 有关MySQL的说明
访问MySQL时,包含头文件qtl_mysql.hpp。
include/qtl_async.hpp
New file
@@ -0,0 +1,421 @@
#ifndef _QTL_ASYNC_H_
#define _QTL_ASYNC_H_
#include <tuple>
#include <memory>
#include <chrono>
#include <functional>
namespace qtl
{
#ifdef _WIN32
    typedef SOCKET socket_type;
#else
    typedef int socket_type;
#endif
namespace detail
{
template<typename Values, typename RowHandler, typename FinishHandler>
struct async_fetch_helper : public std::enable_shared_from_this<async_fetch_helper<Values, RowHandler, FinishHandler>>
{
    async_fetch_helper(const Values& values, const RowHandler& row_handler, const FinishHandler& finish_handler)
        : m_values(values), m_row_handler(row_handler), m_finish_handler(finish_handler), m_auto_close_command(true)
    {
    }
    template<typename Command, typename Exception>
    void start(const std::shared_ptr<Command>& command)
    {
        auto self = this->shared_from_this();
        command->fetch(std::forward<Values>(m_values), [this, command]() {
            return qtl::detail::apply<RowHandler, Values>(std::forward<RowHandler>(m_row_handler), std::forward<Values>(m_values));
        }, [self, command](const Exception& e) {
            if (e || self->m_auto_close_command)
            {
                command->close([self, command, e](const Exception& new_e) {
                    self->m_finish_handler(e ? e : new_e);
                });
            }
            else
            {
                self->m_finish_handler(e);
            }
        });
    }
    void auto_close_command(bool auto_close)
    {
        m_auto_close_command = auto_close;
    }
private:
    Values m_values;
    RowHandler m_row_handler;
    FinishHandler m_finish_handler;
    bool m_auto_close_command;
};
template<typename Values, typename RowHandler, typename FinishHandler>
inline std::shared_ptr<async_fetch_helper<Values, RowHandler, FinishHandler>> make_fetch_helper(const Values& values, const RowHandler& row_handler, const FinishHandler& cpmplete_handler)
{
    return std::make_shared<async_fetch_helper<Values, RowHandler, FinishHandler>>(values, row_handler, cpmplete_handler);
}
template<typename Exception, typename Command, typename RowHandler, typename FinishHandler>
inline void async_fetch_command(const std::shared_ptr<Command>& command, FinishHandler&& finish_handler, RowHandler&& row_handler)
{
    auto values=make_values(row_handler);
    typedef decltype(values) values_type;
    auto helper = make_fetch_helper(std::forward<values_type>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    helper->auto_close_command(false);
    helper->template start<Command, Exception>(command);
}
template<typename Exception, typename Command, typename RowHandler, typename FinishHandler, typename... OtherHandler>
inline void async_fetch_command(const std::shared_ptr<Command>& command, FinishHandler&& finish_handler, RowHandler&& row_handler, OtherHandler&&... other)
{
    async_fetch_command<Exception>(command, [command, finish_handler, other...](const Exception& e) mutable {
        if (e)
        {
            finish_handler(e);
        }
        else
        {
            command->next_result([=](const Exception& e) mutable {
                if (e)
                    finish_handler(e);
                else
                    async_fetch_command<Exception>(command, std::forward<FinishHandler>(finish_handler), std::forward<OtherHandler>(other)...);
            });
        }
    }, std::forward<RowHandler>(row_handler));
}
}
struct event
{
    enum io_flags
    {
        ef_read = 0x1,
        ef_write = 0x2,
        ef_exception = 0x4,
        ef_timeout =0x8,
        ev_all = ef_read | ef_write | ef_exception
    };
    virtual ~event() { }
    virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&&) = 0;
    virtual void remove() = 0;
    virtual bool is_busying() = 0;
};
template<typename T, class Command>
class async_connection
{
public:
    template<typename EventLoop>
    bool bind(EventLoop& ev)
    {
        T* pThis = static_cast<T*>(this);
        if(m_event_handler)
        {
            if(m_event_handler->is_busying())
                return false;
            unbind();
        }
        m_event_handler = ev.add(pThis);
        return m_event_handler!=nullptr;
    }
    qtl::event* event() const
    {
        return m_event_handler;
    }
    bool unbind()
    {
        if(m_event_handler)
        {
            if(m_event_handler->is_busying())
                return false;
            m_event_handler->remove();
            m_event_handler=nullptr;
        }
        return true;
    }
    /*
        ResultHandler defines as:
            void handler(const exception_type& e, uint64_t affected=0);
        Copies will be made of the handler as required.
        If an error occurred, value of affected is undefined.
        Note: parameter affected must has default value.
    */
    template<typename Params, typename ResultHandler>
    void execute(ResultHandler&& handler, const char* query_text, size_t text_length, const Params& params)
    {
        T* pThis = static_cast<T*>(this);
        pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) mutable {
            if(e)
            {
                handler(e, 0);
                command->close([command, handler](const typename T::exception_type& e) mutable {
                    if(e) handler(e, 0);
                });
                return;
            }
            command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) mutable {
                if(e)
                {
                    handler(e, 0);
                    command->close([command, handler](const typename T::exception_type& e) mutable {
                        if(e) handler(e, 0);
                    });
                    return;
                }
                handler(typename T::exception_type(), affected);
                command->close([command, handler](const typename T::exception_type& e) mutable {
                    if(e) handler(e, 0);
                });
            });
        });
    }
    template<typename Params, typename ResultHandler>
    void execute(ResultHandler&& handler, const char* query_text, const Params& params, uint64_t* affected = NULL)
    {
        return execute(handler, query_text, strlen(query_text), params, affected);
    }
    template<typename Params, typename ResultHandler>
    void execute(ResultHandler&& handler, const std::string& query_text, const Params& params, uint64_t* affected = NULL)
    {
        return execute(handler, query_text.data(), query_text.length(), params, affected);
    }
    template<typename... Params, typename ResultHandler>
    void execute_direct(ResultHandler&& handler, const char* query_text, size_t text_length, uint64_t* affected, const Params&... params)
    {
        execute(handler, query_text, text_length, std::forward_as_tuple(params...), affected);
    }
    template<typename... Params, typename ResultHandler>
    void execute_direct(ResultHandler&& handler, const char* query_text, uint64_t* affected, const Params&... params)
    {
        execute(handler, query_text, std::forward_as_tuple(params...), affected);
    }
    template<typename... Params, typename ResultHandler>
    void execute_direct(ResultHandler&& handler, const std::string& query_text, uint64_t* affected, const Params&... params)
    {
        execute(handler, query_text, query_text, std::forward_as_tuple(params...), affected);
    }
    /*
        ResultHandler defines as:
            void handler(const exception_type& e, uint64_t insert_id=0);
        Copies will be made of the handler as required.
        If an error occurred, value of insert_id is undefined.
        If the command is not insert statement, value of insert_id is zero.
        Note: parameter insert_id must has default value.
    */
    template<typename Params, typename ResultHandler>
    void insert(ResultHandler&& handler, const char* query_text, size_t text_length, const Params& params)
    {
        T* pThis = static_cast<T*>(this);
        pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) {
            if(e)
            {
                handler(e, 0);
            }
            else
            {
                command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) {
                    if(e)
                    {
                        handler(e, 0);
                    }
                    else if (affected > 0)
                    {
                        handler(typename T::exception_type(), command->insert_id());
                    }
                    else
                    {
                        handler(typename T::exception_type(), 0);
                    }
                    command->close([handler](const typename T::exception_type& e) {
                        handler(e, 0);
                    });
                });
            }
        });
    }
    template<typename Params, typename ResultHandler>
    void insert(ResultHandler&& handler, const char* query_text, const Params& params)
    {
        insert(handler, query_text, strlen(query_text), params);
    }
    template<typename Params, typename ResultHandler>
    void insert(ResultHandler&& handler, const std::string& query_text, const Params& params)
    {
        insert(handler, query_text.data(), query_text.length(), params);
    }
    template<typename... Params, typename ResultHandler>
    void insert_direct(ResultHandler&& handler, const char* query_text, size_t text_length, const Params&... params)
    {
        insert(handler, query_text, text_length, std::forward_as_tuple(params...));
    }
    template<typename... Params, typename ResultHandler>
    void insert_direct(ResultHandler&& handler, const char* query_text, const Params&... params)
    {
        insert(handler, query_text, strlen(query_text), std::forward_as_tuple(params...));
    }
    template<typename... Params, typename ResultHandler>
    void insert_direct(ResultHandler&& handler, const std::string& query_text, const Params&... params)
    {
        insert(handler, query_text.data(), query_text.length(), std::forward_as_tuple(params...));
    }
    /*
        RowHandler defines as:
            void row_handler(const Values& values);
        FinishHandler defines as:
            void finish_handler(const exception_type& e);
        If a row is fetched, the row handler is called.
        If an error occurred or the operation is completed, the result handler is called.
    */
    template<typename Params, typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const char* query_text, size_t text_length, const Params& params, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        T* pThis = static_cast<T*>(this);
        pThis->open_command(query_text, text_length, [values, row_handler, finish_handler, params](const typename T::exception_type& e, const std::shared_ptr<Command>& command) mutable {
            if(e)
            {
                finish_handler(e);
            }
            else
            {
                command->execute(params, [command, values, row_handler, finish_handler](const typename T::exception_type& e, uint64_t affected) mutable {
                    auto helper=detail::make_fetch_helper(values, row_handler, finish_handler);
                    helper->template start<Command, typename T::exception_type>(command);
                });
            }
        });
    }
    template<typename Params, typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const char* query_text, const Params& params, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, strlen(query_text), params, std::forward<Values>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Params, typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const std::string& query_text, const Params& params, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text.data(), query_text.size(), params, std::forward<Values>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const char* query_text, size_t text_length, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, text_length, std::make_tuple(), std::forward<Values>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const char* query_text, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, strlen(query_text), std::make_tuple(), std::forward<Values>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Values, typename RowHandler, typename FinishHandler>
    void query_explicit(const std::string& query_text, Values&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, std::make_tuple(), std::forward<Values>(values), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Params, typename RowHandler, typename FinishHandler>
    void query(const char* query_text, size_t text_length, const Params& params, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, text_length, params, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Params, typename RowHandler, typename FinishHandler>
    void query(const char* query_text, const Params& params, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, params, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Params, typename RowHandler, typename FinishHandler>
    void query(const std::string& query_text, const Params& params, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, params, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename RowHandler, typename FinishHandler>
    void query(const char* query_text, size_t text_length, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, text_length, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename RowHandler, typename FinishHandler>
    void query(const char* query_text, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename RowHandler, typename FinishHandler>
    void query(const std::string& query_text, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        query_explicit(query_text, detail::make_values(row_handler), std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Params, typename FinishHandler, typename... RowHandlers>
    void query_multi_with_params(const char* query_text, size_t text_length, const Params& params, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        T* pThis = static_cast<T*>(this);
        pThis->open_command(query_text, text_length, [params, finish_handler, row_handlers...](const typename T::exception_type& e, const std::shared_ptr<Command>& command) mutable {
            if (e)
            {
                finish_handler(e);
            }
            else
            {
                command->execute(params, [=](const typename T::exception_type& e, uint64_t affected) mutable {
                    if (e)
                        finish_handler(e);
                    else
                        qtl::detail::async_fetch_command<typename T::exception_type>(command, std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
                });
            }
        });
    }
    template<typename Params, typename FinishHandler, typename... RowHandlers>
    void query_multi_with_params(const char* query_text, const Params& params, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        query_multi_with_params(query_text, strlen(query_text), params, std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
    }
    template<typename Params, typename FinishHandler, typename... RowHandlers>
    void query_multi_with_params(const std::string& query_text, const Params& params, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        query_multi_with_params(query_text.data(), query_text.size(), params, std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
    }
    template<typename FinishHandler, typename... RowHandlers>
    void query_multi(const char* query_text, size_t text_length, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        query_multi_with_params<std::tuple<>, FinishHandler, RowHandlers...>(query_text, text_length, std::make_tuple(), std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
    }
    template<typename FinishHandler, typename... RowHandlers>
    void query_multi(const char* query_text, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        query_multi_with_params<std::tuple<>, FinishHandler, RowHandlers...>(query_text, strlen(query_text), std::make_tuple(), std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
    }
    template<typename FinishHandler, typename... RowHandlers>
    void query_multi(const std::string& query_text, FinishHandler&& finish_handler, RowHandlers&&... row_handlers)
    {
        query_multi_with_params<std::tuple<>, FinishHandler, RowHandlers...>(query_text.data(), query_text.size(), std::make_tuple(), std::forward<FinishHandler>(finish_handler), std::forward<RowHandlers>(row_handlers)...);
    }
protected:
    qtl::event* m_event_handler { nullptr };
};
}
#endif //_QTL_ASYNC_H_
include/qtl_common.hpp
@@ -1,13 +1,19 @@
#ifndef _MYDTL_DATABASE_H_
#define _MYDTL_DATABASE_H_
#ifndef _QTL_COMMON_H_
#define _QTL_COMMON_H_
#if defined(_MSC_VER)
#if _MSC_VER<1800
#error MYDTL need C++11 compiler
#error QTL need C++11 compiler
#endif //MSC
#elif __cplusplus<201103L
#error MYDTL need C++11 compiler
#error QTL need C++11 compiler
#endif //C++11
#if _MSC_VER>=1800 && _MSC_VER<1900
#define NOEXCEPT throw()
#else
#define NOEXCEPT noexcept
#endif //NOEXCEPT
#include <stdint.h>
#include <string.h>
@@ -1167,4 +1173,4 @@
}
#endif //_MYDTL_DATABASE_H_
#endif //_QTL_COMMON_H_
include/qtl_database_pool.hpp
@@ -137,6 +137,7 @@
            return;
        m_trying_connection=true;
        try
        {
            m_background_thread=std::thread(&database_pool<Database>::background_connect, this);
@@ -175,6 +176,170 @@
    }
};
template<typename T, typename EventLoop, typename Connection>
class async_pool
{
public:
    typedef Connection value_type;
    typedef std::shared_ptr<Connection> pointer;
    async_pool(EventLoop& ev)
        : m_ev(ev), m_trying_connecting(false)
    {
    }
    virtual ~async_pool()
    {
        clear();
    }
    /*
        Handler defines as:
        void handler(const pointer& ptr);
    */
    template<typename Handler>
    void get(Handler&& handler, EventLoop* ev=nullptr)
    {
        Connection* db = popup();
        if(ev==nullptr) ev=&m_ev;
        if(db)
        {
            db->bind(*ev);
            handler(typename Connection::exception_type(), wrap(db));
        }
        else if (m_trying_connecting == false)
        {
            create_connection(ev, [this, handler](const typename Connection::exception_type& e,  Connection* db) {
                handler(e, wrap(db));
            });
        }
        else
        {
            handler(typename Connection::exception_type(), nullptr);
        }
    }
    void test_alive()
    {
        if (m_connections.empty())
            return false;
        std::unique_lock<std::mutex> lock(m_pool_mutex);
        auto it = m_connections.begin();
        while (it != m_connections.end())
        {
            Connection* db = *it;
            db->is_alive([this, db](const typename Connection::exception_type& e) {
                if (e)
                {
                    std::unique_lock<std::mutex> lock(m_pool_mutex);
                    auto it = std::find(m_connections.begin(), m_connections.end(), db);
                    delete db;
                    m_connections.erase(it);
                    if (m_connections.empty())
                        try_connect();
                }
            });
            ++it;
        }
    }
private:
    EventLoop& m_ev;
    std::vector<Connection*> m_connections;
    std::recursive_mutex m_pool_mutex;
    std::atomic<bool> m_trying_connecting;
    void recovery(Connection* db)
    {
        if (db == NULL) return;
        db->is_alive([this, db](const typename Connection::exception_type& e) {
            if (e)
            {
                {
                    std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
                    clear();
                }
                try_connect();
            }
            else
            {
                if(!db->unbind())
                    throw std::runtime_error("destroy a busysing connection.");
                std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
                m_connections.push_back(db);
            }
        });
    }
    template<typename Handler>
    void create_connection(EventLoop* ev, Handler&& handler)
    {
        T* pThis = static_cast<T*>(this);
        pThis->new_connection(*ev, [this, handler](const typename Connection::exception_type& e, Connection* db) {
            handler(e, db);
            if (!db)
            {
                std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
                clear();
                timeval tv = { 1,  0};
                m_ev.set_timeout(tv, [this]() {
                    try_connect();
                });
            }
        });
    }
    Connection* popup()
    {
        Connection* db = nullptr;
        std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
        if (!m_connections.empty())
        {
            db = m_connections.back();
            m_connections.pop_back();
        }
        return db;
    }
    void try_connect()
    {
        if (m_trying_connecting)
            return;
        m_trying_connecting = true;
        create_connection(&m_ev, [this](const typename Connection::exception_type& e, Connection* db) {
            if (db)
            {
                std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
                m_connections.push_back(db);
            }
            else
            {
                m_trying_connecting = false;
            }
        });
    }
    void clear()
    {
        std::for_each(m_connections.begin(), m_connections.end(), std::default_delete<Connection>());
        m_connections.clear();
    }
    pointer wrap(Connection* db)
    {
        if (db)
        {
            return pointer(db, [this](Connection* db) {
                recovery(db);
            });
        }
        else return nullptr;
    }
};
}
#endif //_QTL_DATABASE_POOL_H_
include/qtl_mysql.hpp
@@ -1,5 +1,5 @@
#ifndef _MYDTL_MYSQL_H_
#define _MYDTL_MYSQL_H_
#ifndef _QTL_MYSQL_H_
#define _QTL_MYSQL_H_
#include <mysql.h>
#include <errmsg.h>
@@ -12,7 +12,17 @@
#include <array>
#include <functional>
#include <algorithm>
#include <system_error>
#include "qtl_common.hpp"
#include "qtl_async.hpp"
#if LIBMYSQL_VERSION_ID >=80000
typedef bool my_bool;
#endif //MySQL 8
#ifdef MARIADB_VERSION_ID
#define IS_LONGDATA(t) ((t) >= MYSQL_TYPE_TINY_BLOB && (t) <= MYSQL_TYPE_STRING)
#endif //MariaDB
namespace qtl
{
@@ -20,13 +30,9 @@
namespace mysql
{
#if LIBMYSQL_VERSION_ID >=80000
typedef bool my_bool;
#endif //MySQL 8
struct init
{
    init(int argc=-1, char **argv=NULL, char **groups=NULL)
    init(int argc=-1, char **argv=nullptr, char **groups=nullptr)
    {
        //my_init();
        mysql_library_init(argc, argv, groups);
@@ -192,19 +198,20 @@
    binder.bind(const_cast<char*>(str), static_cast<unsigned long>(length), MYSQL_TYPE_VAR_STRING);
}
class statement;
class database;
class base_statement;
class basic_database;
class error : public std::exception
{
public:
    error() : m_error(0) { }
    error(unsigned int err, const char* errmsg) : m_error(err), m_errmsg(errmsg) { }
    explicit error(statement& stmt);
    explicit error(database& db);
    explicit error(const base_statement& stmt);
    explicit error(const basic_database& db);
    error(const error& src) = default;
    virtual ~error() throw() { }
    int code() const throw() { return m_error; }
    operator bool() const { return m_error != 0;  }
    virtual const char* what() const throw() override { return m_errmsg.data(); }
private:
    unsigned int m_error;
@@ -401,8 +408,6 @@
        }
    }
private:
    MYSQL_STMT* m_stmt;
    binder m_binder;
@@ -443,94 +448,87 @@
typedef std::function<void(std::ostream&)> blob_writer;
class statement final
class base_statement
{
public:
    statement() : m_stmt(NULL), m_result(NULL) {}
    explicit statement(database& db);
    statement(const statement&) = delete;
    statement(statement&& src)
protected:
    base_statement() : m_stmt(nullptr), m_result(nullptr) {}
    base_statement(const base_statement&) = delete;
    explicit base_statement(basic_database& db);
    base_statement(base_statement&& src)
        : m_stmt(src.m_stmt), m_result(src.m_result),
        m_binders(std::move(src.m_binders)), m_binderAddins(std::move(src.m_binderAddins))
    {
        src.m_stmt=NULL;
        src.m_result=NULL;
        src.m_stmt=nullptr;
        src.m_result=nullptr;
    }
    statement& operator=(const statement&) = delete;
    statement& operator=(statement&& src)
    base_statement& operator=(const base_statement&) = delete;
    base_statement& operator=(base_statement&& src)
    {
        if(this!=&src)
        {
            m_stmt=src.m_stmt;
            m_result=src.m_result;
            src.m_stmt=NULL;
            src.m_result=NULL;
            src.m_stmt=nullptr;
            src.m_result=nullptr;
            m_binders=std::move(src.m_binders);
            m_binderAddins=std::move(src.m_binderAddins);
        }
        return *this;
    }
    ~statement()
    {
        close();
    }
public:
    operator MYSQL_STMT*() { return m_stmt; }
    void open(const char *query_text, unsigned long text_length=0)
    unsigned int get_parameter_count() const { return mysql_stmt_param_count(m_stmt); }
    unsigned int get_column_count() const { return mysql_stmt_field_count(m_stmt); }
    unsigned long length(unsigned int index) const
    {
        mysql_stmt_reset(m_stmt);
        if(text_length==0) text_length=(unsigned long)strlen(query_text);
        if(mysql_stmt_prepare(m_stmt, query_text, text_length)!=0)
            throw_exception();
        return m_binderAddins[index].m_length;
    }
    void execute()
    bool is_null(unsigned int index) const
    {
        resize_binders(0);
        if(mysql_stmt_execute(m_stmt)!=0)
            throw_exception();
        return m_binderAddins[index].m_isNull!=0;
    }
    template<typename Types>
    void execute(const Types& params)
    size_t find_field(const char* name) const
    {
        unsigned long count=mysql_stmt_param_count(m_stmt);
        if(count>0)
        if(m_result)
        {
            resize_binders(count);
            qtl::bind_params(*this, params);
            if(mysql_stmt_bind_param(m_stmt, &m_binders.front()))
                throw_exception();
            for(size_t i=0; i!=count; i++)
            for(size_t i=0; i!=m_result->field_count; i++)
            {
                if(m_binderAddins[i].m_after_fetch)
                    m_binderAddins[i].m_after_fetch(m_binders[i]);
                if(strncmp(m_result->fields[i].name, name, m_result->fields[i].name_length)==0)
                    return i;
            }
        }
        if(mysql_stmt_execute(m_stmt)!=0)
            throw_exception();
        return -1;
    }
    template<typename Types>
    bool fetch(Types&& values)
    my_ulonglong affetced_rows()
    {
        if(m_result==NULL)
        return mysql_stmt_affected_rows(m_stmt);
    }
    my_ulonglong insert_id()
        {
            unsigned long count=mysql_stmt_field_count(m_stmt);
            if(count>0)
        return mysql_stmt_insert_id(m_stmt);
    }
    binder* get_binder(unsigned long index)
            {
                m_result=mysql_stmt_result_metadata(m_stmt);
                if(m_result==NULL) throw_exception();
                resize_binders(count);
                qtl::bind_record(*this, std::forward<Types>(values));
                set_binders();
                if(mysql_stmt_bind_result(m_stmt, m_binders.data())!=0)
                    throw_exception();
        return &m_binders[index];
            }
    unsigned int error() const
    {
        return mysql_stmt_errno(m_stmt);
        }
        return fetch();
    const char* errmsg() const
    {
        return mysql_stmt_error(m_stmt);
    }
    MYSQL_RES* result() { return m_result; }
    void bind_param(size_t index, const char* param, size_t length)
    {
@@ -544,7 +542,7 @@
    void bind_param(size_t index, std::istream& param)
    {
        m_binders[index].bind(NULL, 0, MYSQL_TYPE_LONG_BLOB);
        m_binders[index].bind(nullptr, 0, MYSQL_TYPE_LONG_BLOB);
        m_binderAddins[index].m_after_fetch=[this, index, &param](const binder&) {
            std::array<char, blob_buffer_size> buffer;
            unsigned long readed=0;
@@ -563,7 +561,7 @@
    void bind_param(size_t index, const blob_writer& param)
    {
        m_binders[index].bind(NULL, 0, MYSQL_TYPE_LONG_BLOB);
        m_binders[index].bind(nullptr, 0, MYSQL_TYPE_LONG_BLOB);
        m_binderAddins[index].m_after_fetch = [this, index, &param](const binder& b) {
            blobbuf buf;
            buf.open(m_stmt, index, b, std::ios::out);
@@ -614,7 +612,7 @@
        if(m_result)
        {
            MYSQL_FIELD* field=mysql_fetch_field_direct(m_result, (unsigned int)index);
            if(field==NULL) throw_exception();
            if (field == nullptr) throw_exception();
            value.clear();
            typename bind_string_helper<T>::char_type* data=value.alloc(field->length);
            m_binderAddins[index].m_before_fetch = [this, value](binder& b) mutable {
@@ -642,7 +640,7 @@
        {
            MYSQL_FIELD* field = mysql_fetch_field_direct(m_result, (unsigned int)index);
            assert(IS_LONGDATA(field->type));
            m_binders[index].bind(NULL, 0, field->type);
            m_binders[index].bind(nullptr, 0, field->type);
            m_binderAddins[index].m_after_fetch=[this, index, &value](const binder& b) {
                unsigned long readed=0;
                std::array<char, blob_buffer_size> buffer;
@@ -668,7 +666,7 @@
        {
            MYSQL_FIELD* field = mysql_fetch_field_direct(m_result, (unsigned int)index);
            assert(IS_LONGDATA(field->type));
            m_binders[index].bind(NULL, 0, field->type);
            m_binders[index].bind(nullptr, 0, field->type);
            m_binderAddins[index].m_after_fetch = [this, index, &value](const binder& b) {
                if (*b.is_null) return;
                value.open(m_stmt, index, b, std::ios::in);
@@ -693,113 +691,21 @@
        }
    }
    unsigned int get_parameter_count() const { return mysql_stmt_param_count(m_stmt); }
    unsigned int get_column_count() const { return mysql_stmt_field_count(m_stmt); }
    unsigned long length(unsigned int index) const
    {
        return m_binderAddins[index].m_length;
    }
    bool is_null(unsigned int index) const
    {
        return m_binderAddins[index].m_isNull!=0;
    }
    size_t find_field(const char* name) const
    {
        if(m_result)
        {
            for(size_t i=0; i!=m_result->field_count; i++)
            {
                if(strncmp(m_result->fields[i].name, name, m_result->fields[i].name_length)==0)
                    return i;
            }
        }
        return -1;
    }
    void close()
    {
        if(m_result)
        {
            mysql_free_result(m_result);
            m_result=NULL;
            m_result = nullptr;
        }
        if(m_stmt)
        {
            mysql_stmt_close(m_stmt);
            m_stmt=NULL;
            m_stmt = nullptr;
        }
    }
    bool fetch()
    {
        for (size_t i = 0; i != m_binders.size(); i++)
        {
            if (m_binderAddins[i].m_before_fetch)
                m_binderAddins[i].m_before_fetch(m_binders[i]);
        }
        int err=mysql_stmt_fetch(m_stmt);
        if(err==0 || err==MYSQL_DATA_TRUNCATED)
        {
            for(size_t i=0; i!=m_binders.size(); i++)
            {
                m_binderAddins[i].is_truncated = (err==MYSQL_DATA_TRUNCATED);
                if(m_binderAddins[i].m_after_fetch)
                    m_binderAddins[i].m_after_fetch(m_binders[i]);
            }
            return true;
        }
        else if(err==1)
            throw_exception();
        return false;
    }
    bool next_result()
    {
        if(m_result)
        {
            mysql_free_result(m_result);
            m_result=NULL;
            mysql_stmt_free_result(m_stmt);
        }
        int ret=0;
        do
        {
            ret=mysql_stmt_next_result(m_stmt);
            if(ret>0) throw_exception();
        }while(ret==0 && mysql_stmt_field_count(m_stmt)<=0);
        return ret==0;
    }
    my_ulonglong affetced_rows()
    {
        return mysql_stmt_affected_rows(m_stmt);
    }
    my_ulonglong insert_id()
    {
        return mysql_stmt_insert_id(m_stmt);
    }
    binder* get_binder(unsigned long index)
    {
        return &m_binders[index];
    }
    unsigned int error() const
    {
        return mysql_stmt_errno(m_stmt);
    }
    const char* errmsg() const
    {
        return mysql_stmt_error(m_stmt);
    }
    MYSQL_RES* result() { return m_result; }
    bool reset() { return mysql_stmt_reset(m_stmt)!=0; }
private:
protected:
    MYSQL_STMT* m_stmt;
    MYSQL_RES* m_result;
    std::vector<binder> m_binders;
@@ -833,9 +739,8 @@
        }
    }
    void throw_exception() { throw mysql::error(*this); }
    void throw_exception() const { throw mysql::error(*this); }
private:
    template<typename Value>
    struct if_null
    {
@@ -847,35 +752,228 @@
        Value& m_value;
        Value m_def;
    };
};
class database final : public qtl::base_database<database, statement>
class statement : public base_statement
{
public:
    statement() = default;
    explicit statement(basic_database& db) : base_statement(db) { }
    statement(statement&& src) : base_statement(std::move(src)) { }
    statement& operator=(statement&& src)
    {
        base_statement::operator =(std::move(src));
        return *this;
    }
    ~statement()
    {
        close();
    }
    void open(const char *query_text, unsigned long text_length=0)
    {
        mysql_stmt_reset(m_stmt);
        if(text_length==0) text_length=(unsigned long)strlen(query_text);
        if(mysql_stmt_prepare(m_stmt, query_text, text_length)!=0)
            throw_exception();
    }
    void execute()
    {
        resize_binders(0);
        if(mysql_stmt_execute(m_stmt)!=0)
            throw_exception();
    }
    template<typename Types>
    void execute(const Types& params)
    {
        unsigned long count=mysql_stmt_param_count(m_stmt);
        if(count>0)
        {
            resize_binders(count);
            qtl::bind_params(*this, params);
            if(mysql_stmt_bind_param(m_stmt, &m_binders.front()))
                throw_exception();
            for(size_t i=0; i!=count; i++)
            {
                if(m_binderAddins[i].m_after_fetch)
                    m_binderAddins[i].m_after_fetch(m_binders[i]);
            }
        }
        if(mysql_stmt_execute(m_stmt)!=0)
            throw_exception();
    }
    template<typename Types>
    bool fetch(Types&& values)
    {
        if(m_result==nullptr)
        {
            unsigned long count=mysql_stmt_field_count(m_stmt);
            if(count>0)
            {
                m_result=mysql_stmt_result_metadata(m_stmt);
                if(m_result==nullptr) throw_exception();
                resize_binders(count);
                qtl::bind_record(*this, std::forward<Types>(values));
                set_binders();
                if(mysql_stmt_bind_result(m_stmt, m_binders.data())!=0)
                    throw_exception();
            }
        }
        return fetch();
    }
    bool fetch()
    {
        for (size_t i = 0; i != m_binders.size(); i++)
        {
            if (m_binderAddins[i].m_before_fetch)
                m_binderAddins[i].m_before_fetch(m_binders[i]);
        }
        int err=mysql_stmt_fetch(m_stmt);
        if(err==0 || err==MYSQL_DATA_TRUNCATED)
        {
            for(size_t i=0; i!=m_binders.size(); i++)
            {
                m_binderAddins[i].is_truncated = (err==MYSQL_DATA_TRUNCATED);
                if(m_binderAddins[i].m_after_fetch)
                    m_binderAddins[i].m_after_fetch(m_binders[i]);
            }
            return true;
        }
        else if(err==1)
            throw_exception();
        return false;
    }
    bool next_result()
    {
        if(m_result)
        {
            mysql_free_result(m_result);
            m_result=nullptr;
            mysql_stmt_free_result(m_stmt);
        }
        int ret=0;
        do
        {
            ret=mysql_stmt_next_result(m_stmt);
            if(ret>0) throw_exception();
        }while(ret==0 && mysql_stmt_field_count(m_stmt)<=0);
        return ret==0;
    }
    bool reset() { return mysql_stmt_reset(m_stmt)!=0; }
};
/*
struct LocalInfile
{
    int read(char *buf, unsigned int buf_len);
    void close();
    int error(char *error_msg, unsigned int error_msg_len);
};
*/
template<typename LocalInfile>
struct local_infile_factory
{
    static int local_infile_init(void **ptr, const char *filename, void *userdata)
    {
        LocalInfile* object = nullptr;
        try
        {
            object = new LocalInfile(filename);
        }
        catch (...)
        {
            return -1;
        }
        return ptr ? 0 : -1;
    }
    static int local_infile_read(void *ptr, char *buf, unsigned int buf_len)
    {
        LocalInfile* object = reinterpret_cast<LocalInfile>(ptr);
        return object->read(buf, buf_len);
    }
    static void local_infile_end(void *ptr)
    {
        LocalInfile* object = reinterpret_cast<LocalInfile>(ptr);
        object->close();
        delete object;
    }
    static int local_infile_error(void *ptr, char *error_msg, unsigned int error_msg_len)
    {
        LocalInfile* object = reinterpret_cast<LocalInfile>(ptr);
        return object->error(error_msg, error_msg_len);
    }
};
class local_file
{
public:
    local_file(const char* filename)
    {
        m_fp = fopen(filename, "rb");
        if (m_fp == nullptr)
            throw std::system_error(std::make_error_code(std::errc(errno)));
    }
    int read(char *buf, unsigned int buf_len)
    {
        fread(buf, 1, buf_len, m_fp);
    }
    void close()
    {
        fclose(m_fp);
    }
    int error(char *error_msg, unsigned int error_msg_len)
    {
        int errcode = errno;
        memset(error_msg, 0, error_msg_len);
        strncpy(error_msg, strerror(errcode), error_msg_len - 1);
        return errcode;
    }
private:
    FILE* m_fp;
};
class basic_database
{
protected:
    basic_database()
    {
        m_mysql = mysql_init(nullptr);
    }
public:
    typedef mysql::error exception_type;
    database()
    ~basic_database()
    {
        m_mysql=mysql_init(NULL);
    }
    ~database()
    {
        if(m_mysql)
        mysql_close(m_mysql);
    }
    database(const database&) = delete;
    database(database&& src)
    basic_database(const basic_database&) = delete;
    basic_database(basic_database&& src)
    {
        m_mysql=src.m_mysql;
        src.m_mysql=NULL;
        src.m_mysql=nullptr;
    }
    database& operator==(const database&) = delete;
    database& operator==(database&& src)
    basic_database& operator==(const basic_database&) = delete;
    basic_database& operator==(basic_database&& src)
    {
        if(this!=&src)
        {
            if(m_mysql)
            mysql_close(m_mysql);
            m_mysql=src.m_mysql;
            src.m_mysql=NULL;
            src.m_mysql=nullptr;
        }
        return *this;
    }
@@ -901,29 +999,6 @@
        return options(MYSQL_OPT_RECONNECT, &re);
    }
    bool open(const char *host, const char *user, const char *passwd, const char *db,
        unsigned long clientflag=0, unsigned int port=0, const char *unix_socket=NULL)
    {
        if(m_mysql==NULL) m_mysql=mysql_init(NULL);
        return mysql_real_connect(m_mysql, host, user, passwd, db, port, unix_socket, clientflag)!=NULL;
    }
    void close()
    {
        mysql_close(m_mysql);
        m_mysql=NULL;
    }
    void refresh(unsigned int options)
    {
        if(mysql_refresh(m_mysql, options)<0)
            throw_exception();
    }
    void select(const char* db)
    {
        if(mysql_select_db(m_mysql, db)!=0)
            throw_exception();
    }
    const char* current() const
    {
        return m_mysql->db;
@@ -936,6 +1011,49 @@
    const char* errmsg() const
    {
        return mysql_error(m_mysql);
    }
    uint64_t affected_rows()
    {
        return mysql_affected_rows(m_mysql);
    }
    unsigned int field_count()
    {
        return mysql_field_count(m_mysql);
    }
    uint64_t insert_id()
    {
        return mysql_insert_id(m_mysql);
    }
protected:
    MYSQL* m_mysql;
    void throw_exception() { throw mysql::error(*this); }
};
#if MARIADB_VERSION_ID >= 050500
class async_connection;
#endif //MariaDB
class database : public basic_database, public qtl::base_database<database, statement>
{
public:
    database() = default;
    bool open(const char *host, const char *user, const char *passwd, const char *db,
        unsigned long clientflag = 0, unsigned int port = 0, const char *unix_socket = nullptr)
    {
        if (m_mysql == nullptr) m_mysql = mysql_init(nullptr);
        return mysql_real_connect(m_mysql, host, user, passwd, db, port, unix_socket, clientflag) != nullptr;
    }
    void close()
    {
        mysql_close(m_mysql);
        m_mysql = nullptr;
    }
    statement open_command(const char* query_text, size_t text_length)
@@ -953,33 +1071,30 @@
        return open_command(query_text.data(), query_text.length());
    }
    void simple_execute(const char* query_text, uint64_t* paffected=NULL)
    void refresh(unsigned int options)
    {
        if (mysql_refresh(m_mysql, options) < 0)
            throw_exception();
    }
    void select(const char* db)
    {
        if (mysql_select_db(m_mysql, db) != 0)
            throw_exception();
    }
    void simple_execute(const char* query_text, uint64_t* paffected = nullptr)
    {
        if(mysql_query(m_mysql, query_text)!=0)
            throw_exception();
        if(paffected) *paffected=affected_rows();
    }
    void simple_execute(const char* query_text, unsigned long text_length, uint64_t* paffected=NULL)
    void simple_execute(const char* query_text, unsigned long text_length, uint64_t* paffected = nullptr)
    {
        if(text_length==0) text_length=(unsigned long)strlen(query_text);
        if(mysql_real_query(m_mysql, query_text, text_length)!=0)
            throw_exception();
        if(paffected) *paffected=affected_rows();
    }
    uint64_t affected_rows()
    {
        return mysql_affected_rows(m_mysql);
    }
    unsigned int field_count()
    {
        return mysql_field_count(m_mysql);
    }
    uint64_t insert_id()
    {
        return mysql_insert_id(m_mysql);
    }
    void auto_commit(bool on)
@@ -1022,7 +1137,8 @@
            MYSQL_ROW row;
            while(row=mysql_fetch_row(result))
            {
                pred(*this, row, fieldCount);
                if (!pred(*this, row, fieldCount))
                    break;
            }
            mysql_free_result(result);
            return true;
@@ -1030,9 +1146,33 @@
        return false;
    }
private:
    MYSQL* m_mysql;
    void throw_exception() { throw mysql::error(*this); }
    template<typename LocalInfile>
    void set_local_infile_factory(local_infile_factory<LocalInfile>* factory)
    {
        typedef local_infile_factory<LocalInfile> factory_type;
        if (factory == nullptr)
        {
            reset_local_infile();
        }
        else
        {
            mysql_set_local_infile_handler(m_mysql, &factory_type::local_infile_init,
                &factory_type::local_infile_read,
                &factory_type::local_infile_end,
                &factory_type::local_infile_error, factory);
        }
    }
    void reset_local_infile()
    {
        mysql_set_local_infile_default(m_mysql);
    }
#if MARIADB_VERSION_ID >= 050500
    async_connection async_mode();
#endif //MariaDB
};
struct time : public MYSQL_TIME
@@ -1100,6 +1240,712 @@
    }
};
#if MARIADB_VERSION_ID >= 100000
int event_flags(int status) NOEXCEPT
{
    int flags = 0;
    if (status&MYSQL_WAIT_READ)
        flags |= event::ef_read;
    if (status&MYSQL_WAIT_WRITE)
        flags |= event::ef_write;
    if (status&MYSQL_WAIT_EXCEPT)
        flags |= event::ef_exception;
    return flags;
}
int mysql_status(int flags) NOEXCEPT
{
    int status = 0;
    if (flags&event::ef_read)
        status |= MYSQL_WAIT_READ;
    if (flags&event::ef_write)
        status |= MYSQL_WAIT_WRITE;
    if (flags&event::ef_exception)
        status |= MYSQL_WAIT_EXCEPT;
    if (flags&event::ef_timeout)
        status |= MYSQL_WAIT_TIMEOUT;
    return status;
}
class async_connection;
class async_statement : public base_statement
{
public:
    async_statement() = default;
    async_statement(async_connection& db);
    async_statement(async_statement&& src)
        : base_statement(std::move(src))
    {
        m_event=src.m_event;
        src.m_event=nullptr;
    }
    async_statement& operator=(async_statement&& src)
    {
        base_statement::operator =(std::move(src));
        m_event=src.m_event;
        src.m_event=nullptr;
        return *this;
    }
    ~async_statement()
    {
        close();
    }
    /*
        Handler defiens as:
        void handler(const qtl::mysql::error& e);
     */
    template<typename Handler>
    void open(Handler&& handler, const char *query_text, size_t text_length=0)
    {
        if(text_length==0) text_length=strlen(query_text);
        if(m_stmt)
        {
            std::string text(query_text, text_length);
            reset([this, text, handler](const mysql::error& e) mutable {
                if(e)
                {
                    handler(e);
                    return;
                }
                prepare(text.data(), text.size(), std::forward<Handler>(handler));
            });
        }
        else
        {
            prepare(query_text, text_length, std::forward<Handler>(handler));
        }
    }
    /*
        ExecuteHandler defiens as:
        void handler(const qtl::mysql::error& e, uint64_t affected);
     */
    template<typename ExecuteHandler>
    void execute(ExecuteHandler&& handler)
    {
        resize_binders(0);
        int ret=0;
        int status = mysql_stmt_execute_start(&ret, m_stmt);
        if(status)
            wait_execute(status, std::forward<ExecuteHandler>(handler));
        else if(ret)
            handler(mysql::error(*this), 0);
        else
            handler(mysql::error(), affetced_rows());
    }
    template<typename Types, typename Handler>
    void execute(const Types& params, Handler&& handler)
    {
        unsigned long count=mysql_stmt_param_count(m_stmt);
        if(count>0)
        {
            resize_binders(count);
            qtl::bind_params(*this, params);
            if(mysql_stmt_bind_param(m_stmt, &m_binders.front()))
                throw_exception();
            for(size_t i=0; i!=count; i++)
            {
                if(m_binderAddins[i].m_after_fetch)
                    m_binderAddins[i].m_after_fetch(m_binders[i]);
            }
        }
        execute(std::forward<Handler>(handler));
    }
    template<typename Types, typename RowHandler, typename FinishHandler>
    void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        if(m_result==nullptr)
        {
            unsigned long count=mysql_stmt_field_count(m_stmt);
            if(count>0)
            {
                m_result=mysql_stmt_result_metadata(m_stmt);
                if(m_result==nullptr) throw_exception();
                resize_binders(count);
                qtl::bind_record(*this, std::forward<Types>(values));
                set_binders();
                if(mysql_stmt_bind_result(m_stmt, m_binders.data())!=0)
                {
                    finish_handler(mysql::error(*this));
                    return;
                }
            }
        }
        fetch(std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename RowHandler, typename FinishHandler>
    void fetch(RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        int ret = 0;
        int status = start_fetch(&ret);
        if (status == 0)
            status = after_fetch(status, ret, std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
        if (status)
            wait_fetch(status, std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
    }
    template<typename Handler>
    void reset(Handler&& handler)
    {
        my_bool ret=false;
        int status = mysql_stmt_reset_start(&ret, m_stmt);
        if(status)
            wait_operation<my_bool>(status, &mysql_stmt_reset_cont, handler);
        else
            handler( (ret) ? mysql::error(*this) : mysql::error());
    }
    void close()
    {
        base_statement::close();
    }
    template<typename Handler>
    void close(Handler&& handler)
    {
        if(m_result)
        {
            my_bool ret=0;
            int status = mysql_stmt_free_result_start(&ret, m_stmt);
            if(status)
            {
                this->wait_free_result(status, [this, handler](const mysql::error& e) mutable {
                    m_result=nullptr;
                    if (e)
                        handler(e);
                    else
                        close(handler);
                });
            }
            else if(ret)
                handler(mysql::error(*this));
            else
                m_result=nullptr;
        }
        if(m_stmt)
        {
            my_bool ret=0;
            int status = mysql_stmt_close_start(&ret, m_stmt);
            if(status)
            {
                wait_operation<my_bool>(status, &mysql_stmt_close_cont, [this, handler](const mysql::error& e) mutable {
                    m_stmt=nullptr;
                    handler(e);
                });;
            }
            else
            {
                m_stmt = nullptr;
                handler((ret) ? mysql::error(*this) : mysql::error());
            }
        }
    }
    template<typename Handler>
    void next_result(Handler&& handler)
    {
        if(m_result)
        {
            mysql_free_result(m_result);
            m_result = nullptr;
        }
        int ret=0;
        do
        {
            int status = mysql_stmt_next_result_start(&ret, m_stmt);
            if (status)
            {
                wait_next_result(status, std::forward<Handler>(handler));
                return;
            }
        }while(ret==0 && mysql_stmt_field_count(m_stmt)<=0);
        handler((ret) ? mysql::error(*this) : mysql::error());
    }
private:
    event* m_event;
    template<typename Handler>
    void prepare(const char *query_text, size_t text_length, Handler&& handler)
    {
        int ret;
        int status=mysql_stmt_prepare_start(&ret, m_stmt, query_text, (unsigned long)text_length);
        if(status)
            wait_operation<int>(status, &mysql_stmt_prepare_cont, handler);
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    template<typename Ret, typename Func, typename Handler>
    void wait_operation(int status, Func func, Handler&& handler)
    {
        m_event->set_io_handler(event_flags(status), mysql_get_timeout_value(m_stmt->mysql),
            [this, func, handler](int flags) mutable {
                Ret ret = 0;
                int status = func(&ret, m_stmt, mysql_status(flags));
                if (status)
                    wait_operation<Ret>(status, func, handler);
                else
                    handler((ret) ? mysql::error(*this) : mysql::error());
        });
    }
    template<typename ExecuteHandler>
    void wait_execute(int status, ExecuteHandler&& handler)
    {
        m_event->set_io_handler(event_flags(status), mysql_get_timeout_value(m_stmt->mysql),
            [this, handler](int flags) mutable {
                int ret = 0;
                int status = mysql_stmt_execute_cont(&ret, m_stmt, mysql_status(flags));
                if (status)
                    wait_execute(status, handler);
                else if(ret)
                    handler(mysql::error(*this), 0);
                else
                    handler(mysql::error(), affetced_rows());
        });
    }
    template<typename RowHandler, typename FinishHandler>
    void wait_fetch(int status, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        m_event->set_io_handler(event_flags(status), mysql_get_timeout_value(m_stmt->mysql),
            [this, row_handler, finish_handler](int flags) mutable {
                int ret = 0;
                int status = mysql_stmt_fetch_cont(&ret, m_stmt, mysql_status(flags));
                if (status == 0)
                    status = after_fetch(status, ret, std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
                if (status)
                    wait_fetch(status, std::forward<RowHandler>(row_handler), std::forward<FinishHandler>(finish_handler));
        });
    }
    int start_fetch(int* ret)
    {
        for (size_t i = 0; i != m_binders.size(); i++)
        {
            if (m_binderAddins[i].m_before_fetch)
                m_binderAddins[i].m_before_fetch(m_binders[i]);
        }
        return mysql_stmt_fetch_start(ret, m_stmt);
    }
    template<typename RowHandler, typename FinishHandler>
    int after_fetch(int status, int ret, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        while (status == 0)
        {
            if (ret == 0 || ret == MYSQL_DATA_TRUNCATED)
            {
                for (size_t i = 0; i != m_binders.size(); i++)
                {
                    m_binderAddins[i].is_truncated = (ret == MYSQL_DATA_TRUNCATED);
                    if (m_binderAddins[i].m_after_fetch)
                        m_binderAddins[i].m_after_fetch(m_binders[i]);
                }
                if (row_handler())
                    status = start_fetch(&ret);
                else
                {
                    finish_handler(mysql::error());
                    break;
                }
            }
            else if (ret == 1)
            {
                finish_handler(mysql::error(*this));
                break;
            }
            else
            {
                finish_handler(mysql::error());
                break;
            }
        };
        return status;
    }
    template<typename ResultHandler>
    void wait_free_result(int status, ResultHandler&& handler) NOEXCEPT
    {
        m_event->set_io_handler(event_flags(status), mysql_get_timeout_value(m_stmt->mysql),
            [this, handler](int flags) mutable {
                my_bool ret = false;
                int status = mysql_stmt_free_result_cont(&ret, m_stmt, mysql_status(flags));
                if (status)
                    wait_free_result(status, handler);
                else
                    handler(mysql::error());
        });
    }
    template<typename Handler>
    void wait_next_result(int status, Handler&& handler)
    {
        m_event->set_io_handler(event_flags(status), mysql_get_timeout_value(m_stmt->mysql),
            [this, handler](int flags) mutable {
            int ret = 0;
            int status = mysql_stmt_next_result_cont(&ret, m_stmt, mysql_status(flags));
            if (status)
                wait_next_result(status, handler);
            else if (ret)
                handler(mysql::error(*this));
            else if (mysql_stmt_field_count(m_stmt) > 0)
                handler(mysql::error());
            else
                next_result(std::forward<Handler>(handler));
        });
    }
};
class async_connection : public basic_database, public qtl::async_connection<async_connection, async_statement>
{
public:
    async_connection() = default;
    /*
        OpenHandler defines as:
            void handler(const qtl::mysql::error& e) NOEXCEPT;
    */
    template<typename EventLoop, typename OpenHandler>
    void open(EventLoop& ev, OpenHandler&& handler, const char *host, const char *user, const char *passwd, const char *db,
        unsigned long clientflag = 0, unsigned int port = 0, const char *unix_socket = nullptr)
    {
        if (m_mysql == nullptr)
            m_mysql = mysql_init(nullptr);
        mysql_options(m_mysql, MYSQL_OPT_NONBLOCK, 0);
        MYSQL* ret = nullptr;
        int status = mysql_real_connect_start(&ret, m_mysql, host, user, passwd, db, port, unix_socket, clientflag);
        bind(ev);
        if (status)
            wait_connect(status, std::forward<OpenHandler>(handler));
        else
            handler((ret == nullptr) ? mysql::error(*this) : mysql::error());
    }
    /*
        CloseHandler defines as:
            void handler() NOEXCEPT;
    */
    template<typename CloseHandler >
    void close(CloseHandler&& handler) NOEXCEPT
    {
        int status  = mysql_close_start(m_mysql);
        if (status)
        {
            wait_close(status, [this, handler]() {
                handler();
                m_mysql = nullptr;
            });
        }
        else
        {
            handler();
            m_mysql = nullptr;
        }
    }
    /*
        Handler defines as:
            void handler(const qtl::mysql::error& e) NOEXCEPT;
    */
    template<typename Handler>
    void refresh(Handler&& handler, unsigned int options) NOEXCEPT
    {
        int ret = 0;
        int status = mysql_refresh_start(&ret, m_mysql, options);
        if (status)
            wait_operation(status, &mysql_refresh_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    template<typename Handler>
    void select(Handler&& handler, const char* db) NOEXCEPT
    {
        int ret = 0;
        int status = mysql_select_db_start(&ret, m_mysql, db);
        if (status)
            wait_operation(status, &mysql_select_db_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    template<typename Handler>
    void is_alive(Handler&& handler) NOEXCEPT
    {
        int ret = 0;
        int status = mysql_ping_start(&ret, m_mysql);
        if (status)
            wait_operation(status, &mysql_ping_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    /*
        Handler defines as:
            void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
    */
    template<typename ExecuteHandler>
    void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
    {
        int ret = 0;
        int status = mysql_query_start(&ret, m_mysql, query_text);
        if (status)
        {
            wait_operation(status, &mysql_query_cont, [this, handler](const mysql::error& e) {
                uint64_t affected = 0;
                if (!e) affected = affected_rows();
                handler(e, affected);
            });
        }
        else
        {
            uint64_t affected = 0;
            if (ret >= 0) affected = affected_rows();
            handler((ret) ? mysql::error(*this) : mysql::error(), affected);
        }
    }
    template<typename ExecuteHandler>
    void simple_execute(ExecuteHandler&& handler, const char* query_text, unsigned long text_length) NOEXCEPT
    {
        int ret = 0;
        int status = mysql_real_query_start(&ret, m_mysql, query_text, text_length);
        if (status)
        {
            wait_operation(status, &mysql_real_query_cont, [this, handler](const mysql::error& e) mutable {
                uint64_t affected = 0;
                if (!e) affected = affected_rows();
                handler(e, affected);
            });
        }
        else
        {
            uint64_t affected = 0;
            if (ret >= 0) affected = affected_rows();
            handler((ret) ? mysql::error(*this) : mysql::error(), affected);
        }
    }
    template<typename Handler>
    void auto_commit(Handler&& handler, bool on) NOEXCEPT
    {
        my_bool ret;
        int status = mysql_autocommit_start(&ret, m_mysql, on ? 1 : 0);
        if(status)
            wait_operation(status, &mysql_autocommit_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    template<typename Handler>
    void begin_transaction(Handler&& handler) NOEXCEPT
    {
        auto_commit(handler, false);
    }
    template<typename Handler>
    void rollback(Handler&& handler) NOEXCEPT
    {
        my_bool ret;
        int status = mysql_rollback_start(&ret, m_mysql);
        if (status)
            wait_operation(status, &mysql_rollback_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    template<typename Handler>
    void commit(Handler&& handler) NOEXCEPT
    {
        my_bool ret;
        int status = mysql_commit_start(&ret, m_mysql);
        if (status)
            wait_operation(status, &mysql_commit_cont, std::forward<Handler>(handler));
        else
            handler((ret) ? mysql::error(*this) : mysql::error());
    }
    /*
        RowHandler defines as:
            bool row_handler(MYSQL_ROW row, int field_count) NOEXCEPT;
        ResultHandler defines as:
            void result_handler(const qtl::mysql::error& e, size_t row_count) NOEXCEPT;
    */
    template<typename RowHandler, typename ResultHandler>
    void simple_query(const char* query, unsigned long length, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
    {
        simple_execute([this, row_handler, result_handler](const mysql::error& e, uint64_t affected) mutable {
            if (e)
            {
                result_handler(e, 0);
                return;
            }
            unsigned int field_count = mysql_field_count(m_mysql);
            if (field_count > 0)
            {
                MYSQL_RES* result = nullptr;
                int status = mysql_store_result_start(&result, m_mysql);
                if (status)
                    wait_query(status, field_count, row_handler, result_handler);
                else if (result)
                    fetch_rows(result, field_count, 0, row_handler, result_handler);
                else
                    result_handler(mysql::error(*this), 0);
            }
            else
            {
                result_handler(mysql::error(), 0);
            }
        }, query, length);
    }
    template<typename Handler>
    void open_command(const char* query_text, size_t text_length, Handler&& handler)
    {
        std::shared_ptr<async_statement> stmt=std::make_shared<async_statement>(*this);
        stmt->open([stmt, handler](const mysql::error& e) mutable {
            handler(e, stmt);
        }, query_text, (unsigned long)text_length);
    }
    socket_type socket() const NOEXCEPT { return mysql_get_socket(m_mysql); }
private:
    template<typename OpenHandler>
    void wait_connect(int status, OpenHandler&& handler) NOEXCEPT
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, handler](int flags) mutable {
            MYSQL* ret = nullptr;
            int status = mysql_real_connect_cont(&ret, m_mysql, mysql_status(flags));
            if (status)
                wait_connect(status, handler);
            else
                handler((ret == nullptr) ? mysql::error(*this) : mysql::error());
        });
    }
    template<typename CloseHandler>
    void wait_close(int status, CloseHandler&& handler) NOEXCEPT
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, handler](int flags) {
            MYSQL* ret = nullptr;
            int status = mysql_close_cont(m_mysql, mysql_status(flags));
            if (status)
                wait_close(status, handler);
            else
                handler();
        });
    }
    template<typename Func, typename Handler>
    void wait_operation(int status, Func func, Handler&& handler) NOEXCEPT
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, func, handler](int flags) mutable {
            int ret = 0;
            int status = func(&ret, m_mysql, mysql_status(flags));
            if (status)
                wait_operation(status, func, handler);
            else
                handler((ret) ? mysql::error(*this) : mysql::error());
        });
    }
    template<typename RowHandler, typename ResultHandler>
    void wait_query(int status, int field_count, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, field_count, row_handler, result_handler](int flags) {
            MYSQL_RES* result = 0;
            int status = mysql_store_result_cont(&result, m_mysql, mysql_status(flags));
            if (status)
                wait_query(status, field_count, row_handler, result_handler);
            else if (result)
                fetch_rows(result, field_count, 0, row_handler, result_handler);
            else
                result_handler(mysql::error(*this), 0);
        });
    }
    template<typename RowHandler, typename ResultHandler>
    void fetch_rows(MYSQL_RES* result, int field_count, size_t row_count, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
    {
        MYSQL_ROW row;
        int status = mysql_fetch_row_start(&row, result);
        if (status)
            wait_fetch(status, result, field_count, row_count, row_handler, result_handler);
        else if(row && row_handler(row, field_count))
            fetch_rows(result, field_count, row_count + 1, row_handler, result_handler);
        else
            free_result(result, row_count, result_handler);
    }
    template<typename ResultHandler>
    void free_result(MYSQL_RES* result, size_t row_count, ResultHandler&& result_handler) NOEXCEPT
    {
        int status = mysql_free_result_start(result);
        if (status)
            wait_free_result(status, result, row_count, result_handler);
        else
            result_handler(mysql::error(), row_count);
    }
    template<typename RowHandler, typename ResultHandler>
    void wait_fetch(int status, MYSQL_RES* result, int field_count,  size_t row_count, RowHandler&& row_handler, ResultHandler&& result_handler)
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, result, field_count, row_count, row_handler, result_handler](int flags) mutable {
            MYSQL_ROW row;
            int status = mysql_fetch_row_cont(&row, result, mysql_status(flags));
            if (status)
                wait_fetch(status, result, field_count, row_count, row_handler, result_handler);
            else if (result && row_handler(row, field_count))
                fetch_rows(result, field_count, row_count+1, row_handler, result_handler);
            else
                free_result(result, row_count, result_handler);
        });
    }
    template<typename ResultHandler>
    void wait_free_result(int status, MYSQL_RES* result, size_t row_count, ResultHandler&& handler) NOEXCEPT
    {
        m_event_handler->set_io_handler(event_flags(status), mysql_get_timeout_value(m_mysql),
            [this, result, row_count, handler](int flags) mutable {
            MYSQL* ret = nullptr;
            int status = mysql_free_result_cont(result, mysql_status(flags));
            if (status)
                wait_free_result(status, result, row_count, handler);
            else
                handler(mysql::error(), row_count);
        });
    }
};
inline     async_statement::async_statement(async_connection& db)
: base_statement(static_cast<basic_database&>(db))
{
    m_event=db.event();
}
#endif //MariaDB 10.0
typedef qtl::transaction<database> transaction;
template<typename Record>
@@ -1116,28 +1962,28 @@
    return stmt;
}
inline error::error(statement& stmt)
inline error::error(const base_statement& stmt)
{
    const char* errmsg=stmt.errmsg();
    m_error=stmt.error();
    if(errmsg) m_errmsg=errmsg;
}
inline error::error(database& db)
inline error::error(const basic_database& db)
{
    const char* errmsg=db.errmsg();
    m_error=db.error();
    if(errmsg) m_errmsg=errmsg;
}
inline statement::statement(database& db)
inline base_statement::base_statement(basic_database& db)
{
    m_stmt=mysql_stmt_init(db.handle());
    m_result=NULL;
    m_result=nullptr;
}
}
}
#endif //_MYDTL_MYSQL_H_
#endif //_QTL_MYSQL_H_
include/qtl_mysql_pool.hpp
@@ -18,11 +18,14 @@
    virtual database* new_database() throw() override
    {
        database* db=new database;
        db->charset_name("utf8");
        if(!db->open(m_host.data(), m_user.data(), m_password.data(), m_database.data(), 0, m_port))
        {
            delete db;
            db=NULL;
        }
        else
        {
            db->charset_name("utf8");
        }
        return db;
    }
@@ -35,6 +38,44 @@
    std::string m_password;
};
#if MARIADB_VERSION_ID >= 050500
template<typename EventLoop>
class async_pool : public qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection>
{
    typedef qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection> base_class;
public:
    async_pool(EventLoop& ev) : base_class(ev), m_port(0) { }
    virtual ~async_pool() { }
    template<typename Handler>
    void new_connection(EventLoop& ev, Handler&& handler) throw()
    {
        async_connection* db = new async_connection;
        db->open(ev, [this, handler, db](const mysql::error& e) mutable {
            if (e)
            {
                delete db;
                db = nullptr;
            }
            else
            {
                db->charset_name("utf8");
            }
            handler(e, db);
        }, m_host.data(), m_user.data(), m_password.data(), m_database.data(), 0, m_port);
    }
protected:
    std::string m_host;
    unsigned short m_port;
    std::string m_database;
    std::string m_user;
    std::string m_password;
};
#endif //MariaDB
}
}
include/qtl_odbc.hpp
@@ -33,7 +33,7 @@
    error(const object<Type>& h, SQLINTEGER code);
    error(SQLINTEGER code, const char* msg) : m_errno(code), m_errmsg(msg) { }
    SQLINTEGER code() const { return m_errno; }
    operator bool() { return m_errno==SQL_SUCCESS || m_errno==SQL_SUCCESS_WITH_INFO; }
    operator bool() { return m_errno!=SQL_SUCCESS || m_errno!=SQL_SUCCESS_WITH_INFO; }
    virtual const char* what() const throw() override { return m_errmsg.data(); }
private:
    SQLINTEGER m_errno;
test/TestMariaDB.cpp
New file
@@ -0,0 +1,421 @@
#include "stdafx.h"
#include "../include/qtl_mysql.hpp"
#include <vector>
#include <thread>
#include <system_error>
#include <time.h>
#include <limits.h>
#include "../include/qtl_mysql_pool.hpp"
#if MARIADB_VERSION_ID < 0100000
#error "The program need mariadb version > 10.0"
#endif
class simple_event_loop
{
public:
    simple_event_loop()
    {
        m_expired = 0;
        m_maxfd = 0;
        m_stoped = false;
        FD_ZERO(&m_readset);
        FD_ZERO(&m_writeset);
        FD_ZERO(&m_exceptset);
    }
    void reset()
    {
        m_stoped = false;
    }
    void run()
    {
        while (!m_stoped)
            do_once(nullptr);
    }
    void run_for(long seconds)
    {
        time_t expired, now;
        time(&now);
        expired = now + seconds;
        timeval tv = { seconds };
        do
        {
            do_once(&tv);
            time(&now);
            tv.tv_sec = expired - now;
            tv.tv_usec = 0;
        } while (!m_stoped && now < expired);
    }
    void stop()
    {
        m_stoped = true;
    }
private:
    class event_item : public qtl::event
    {
    public:
        event_item(simple_event_loop* loop, qtl::socket_type fd)
            : m_loop(loop), m_fd(fd), m_expired(LONG_MAX)
        {
        }
        virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&& handler) NOEXCEPT override
        {
            if (timeout > 0)
            {
                time_t now;
                time(&now);
                if (m_expired > now + timeout)
                    m_expired = now + timeout;
                m_loop->set(this, flags, m_expired);
            }
            else
            {
                m_expired = LONG_MAX;
                m_loop->set(this, flags, 0);
            }
            m_io_handler = std::forward<std::function<void(int)>>(handler);
        }
        virtual void remove() override
        {
            m_loop->remove(this);
        }
        virtual bool is_busying() override
        {
            return m_io_handler!=nullptr;
        }
        simple_event_loop* m_loop;
        qtl::socket_type m_fd;
        time_t m_expired;
        std::function<void(int)> m_io_handler;
    };
    std::vector<std::unique_ptr<event_item>> m_events;
    //implement for QTL
public:
    template<typename Connection>
    event_item* add(Connection* connection)
    {
        qtl::socket_type fd = connection->socket();
        event_item* result = new event_item(this, fd);
        std::unique_ptr<event_item> item(result);
        m_events.emplace_back(std::move(item));
        return result;
    }
    template<typename Handler>
    event_item* set_timeout(const timeval& timeout, Handler&& handler)
    {
        event_item* result = new event_item(this, 0);
        std::unique_ptr<event_item> item(result);
        ::time(&result->m_expired);
        result->m_expired+=timeout.tv_sec;
        if(m_expired>result->m_expired)
            m_expired=result->m_expired;
        m_events.emplace_back(std::move(item));
        return result;
    }
public:
    void set(event_item* item, int flags, time_t expired)
    {
        if (item->m_fd + 1 > m_maxfd)
            m_maxfd = item->m_fd + 1;
        if (flags&qtl::event::ef_read)
            FD_SET(item->m_fd, &m_readset);
        if (flags&qtl::event::ef_write)
            FD_SET(item->m_fd, &m_writeset);
        if (flags&qtl::event::ef_exception)
            FD_SET(item->m_fd, &m_exceptset);
        if (expired > 0 && m_expired > expired)
            m_expired = expired;
    }
    void remove(event_item* item)
    {
        auto it = m_events.begin();
        while (it != m_events.end())
        {
            if (it->get() == item)
            {
                m_events.erase(it);
                return;
            }
            ++it;
        }
    }
private:
    fd_set m_readset, m_writeset, m_exceptset;
    qtl::socket_type m_maxfd;
    int do_once(timeval* timeout)
    {
        fd_set rs = m_readset, ws = m_writeset, es = m_exceptset;
        timeval tv = { INT_MAX, 0 };
        time_t now;
        time(&now);
        if (timeout)
        {
            tv = *timeout;
            timeout = &tv;
        }
        long d = m_expired - now;
        if (d > 0 && tv.tv_sec > d)
        {
            tv.tv_sec = d;
            timeout = &tv;
        }
        qtl::socket_type maxfd = m_maxfd;
        FD_ZERO(&m_readset);
        FD_ZERO(&m_writeset);
        FD_ZERO(&m_exceptset);
        m_maxfd = 0;
        if (maxfd == 0)
        {
            if (timeout)
            {
                std::this_thread::sleep_for(std::chrono::microseconds(timeout->tv_sec*std::micro::den + timeout->tv_usec));
                check_timeout();
            }
            else
            {
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
            return 0;
        }
        else
        {
            int ret = select(maxfd, &rs, &ws, &es, timeout);
            if (ret > 0)
            {
                for (auto& item : m_events)
                {
                    int flags = 0;
                    if (FD_ISSET(item->m_fd, &rs))
                        flags |= qtl::event::ef_read;
                    if (FD_ISSET(item->m_fd, &ws))
                        flags |= qtl::event::ef_write;
                    if (FD_ISSET(item->m_fd, &es))
                        flags |= qtl::event::ef_exception;
                    if (flags && item->m_io_handler)
                    {
                        auto handler = std::move(item->m_io_handler);
                        handler(flags);
                    }
                }
            }
            else if (ret == 0)
            {
                time(&now);
                for (auto& item : m_events)
                {
                    if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
                    {
                        item->m_io_handler(qtl::event::ef_timeout);
                        item->m_expired = 0;
                        item->m_io_handler = nullptr;
                    }
                }
            }
            else if (ret < 0)
            {
                int errc;
#ifdef _WIN32
                errc = WSAGetLastError();
#else
                errc = errno;
#endif
                throw std::system_error(errc, std::system_category());
            }
            return ret;
        }
    }
    void check_timeout()
    {
        time_t now;
        time(&now);
        for (auto& item : m_events)
        {
            if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
            {
                item->m_io_handler(qtl::event::ef_timeout);
                item->m_expired = 0;
                item->m_io_handler = nullptr;
            }
        }
    }
    time_t m_expired;
    bool m_stoped;
};
simple_event_loop ev;
using namespace qtl::mysql;
void LogError(const error& e)
{
    fprintf(stderr, "MySQL Error(%d): %s\n", e.code(), e.what());
}
const char mysql_server[]="localhost";
const char mysql_user[]="root";
const char mysql_password[]="";
const char mysql_database[]="test";
void SimpleTest()
{
    async_connection connection;
    ev.reset();
    connection.open(ev, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            ev.stop();
        }
        else
        {
            printf("Connect to mysql ok.\n");
            connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
                for (int i = 0; i != field_count; i++)
                    printf("%s\t", row[i]);
                printf("\n");
                return true;
            }, [&connection](const error& e, size_t row_count) {
                if (e)
                    LogError(e);
                else
                    printf("Total %lu rows.\n", row_count);
                connection.close([]() {
                    printf("Close connection ok.\n");
                    ev.stop();
                });
            });
        }
    }, mysql_server, mysql_user, mysql_password, mysql_database);
    ev.run();
}
void ExecuteTest()
{
    async_connection connection;
    ev.reset();
    connection.open(ev, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            ev.stop();
        }
        else
        {
            printf("Connect to mysql ok.\n");
            connection.execute([&connection](const error& e, uint64_t affected) mutable {
                    if (e)
                        LogError(e);
                    else
                        printf("Insert %llu records ok.\n", affected);
                    connection.close([]() {
                        printf("Close connection ok.\n");
                        ev.stop();
                    });
            }, "insert into test(name, createtime, company) values(?, now(), ?)", 0, std::make_tuple("test name", "test company"));
        }
    }, mysql_server, mysql_user, mysql_password, mysql_database);
    ev.run();
};
void QueryTest()
{
    async_connection connection;
    ev.reset();
    connection.open(ev, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            ev.stop();
        }
        else
        {
            printf("Connect to mysql ok.\n");
            connection.query("select id, name, CreateTime, Company from test", 0,
                [](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
                char szTime[128] = { 0 };
                if (create_time.year != 0)
                {
                    struct tm tm;
                    create_time.as_tm(tm);
                    strftime(szTime, 128, "%c", &tm);
                }
                printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
            }, [&connection](const error& e) {
                printf("query has completed.\n");
                if (e)
                    LogError(e);
                connection.close([]() {
                    printf("Close connection ok.\n");
                    ev.stop();
                });
            });
        }
    }, mysql_server, mysql_user, mysql_password, mysql_database);
    ev.run();
}
void MultiQueryTest()
{
    async_connection connection;
    ev.reset();
    connection.open(ev, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            ev.stop();
        }
        else
        {
            printf("Connect to mysql ok.\n");
            connection.query_multi("call test_proc",
                [&connection](const error& e) {
                if (e)
                    LogError(e);
                connection.close([]() {
                    printf("Close connection ok.\n");
                    ev.stop();
                });
            }, [](uint32_t i, const std::string& str) {
                printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
            }, [](const qtl::mysql::time& time) {
                struct tm tm;
                time.as_tm(tm);
                printf("current time is: %s\n", asctime(&tm));
            });
        }
    }, mysql_server, mysql_user, mysql_password, mysql_database);
    ev.run();
}
int main(int argc, char* argv[])
{
    ExecuteTest();
    SimpleTest();
    QueryTest();
    MultiQueryTest();
    return 0;
}
test/TestMysql.cpp
@@ -291,7 +291,7 @@
    try
    {
        db.query("SELECT Data from test_stream", [](qtl::mysql::blobbuf& buf) {
        db.query("SELECT Data from test_stream", [](qtl::mysql::blobbuf&& buf) {
            istream s(&buf);
            string str;
            while (!s.eof())
test/test_mariadb.mak
New file
@@ -0,0 +1,22 @@
TARGET=test_mariadb
CC=g++
PCH_HEADER=stdafx.h
PCH=stdafx.h.gch
OBJ=TestMariaDB.o
CFLAGS=-g -D_DEBUG -O2 -I/usr/include/mariadb -I/usr/local/include
CXXFLAGS=-I../include -std=c++11
LDFLAGS= -L/usr/local/lib -L/usr/local/mariadb/lib -lmariadb
all : $(TARGET)
$(PCH) : $(PCH_HEADER)
    $(CC) $(CFLAGS) $(CXXFLAGS) -x c++-header -o $@ $<
TestMariaDB.o : TestMariaDB.cpp $(PCH)
    $(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ $<
$(TARGET) : $(OBJ)
    libtool --tag=CXX --mode=link $(CC) $(LDFLAGS) -o $@ $^
clean:
    rm $(TARGET) $(PCH) $(OBJ) -f
test/test_mysql.mak
@@ -3,8 +3,8 @@
PCH_HEADER=stdafx.h
PCH=stdafx.h.gch
OBJ=TestMysql.o md5.o
CFLAGS=-g -D_DEBUG -O2 -I/usr/include/mysql -I/usr/local/include -I/usr/local/mysql/include
CXXFLAGS=-I../include -std=c++11
CFLAGS=-g -D_DEBUG -O2 -I../include -I/usr/include/mysql -I/usr/local/include -I/usr/local/mysql/include
CXXFLAGS=-std=c++11
LDFLAGS= -L/usr/local/lib -L/usr/local/mysql/lib -lcpptest -lmysqlclient
all : $(TARGET)
test/test_sqlite.mak
@@ -3,20 +3,19 @@
PCH_HEADER=stdafx.h
PCH=stdafx.h.gch
OBJ=TestSqlite.o sqlite3.o md5.o
CFLAGS=-g -D_DEBUG -O2 -I. -I/usr/local/include
CXXFLAGS=-I../include -std=c++11
LDFLAGS= -L/usr/local/lib -pthread -ldl -lcpptest
CFLAGS=-g -D_DEBUG -O2 -I. -I../include -I/usr/local/include -std=c++11
LDFLAGS= -L/usr/local/lib -ldl -lcpptest -lpthread
all : $(TARGET)
$(PCH) : $(PCH_HEADER)
    $(CC) $(CFLAGS) $(CXXFLAGS) -x c++-header -o $@ $<
    $(CC) $(CFLAGS) -x c++-header -o $@ $<
TestSqlite.o : $(PCH) TestSqlite.cpp TestSqlite.h
    $(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ TestSqlite.cpp
    $(CC) -c $(CFLAGS) -o $@ TestSqlite.cpp
    
sqlite3.o : sqlite3.c sqlite3.h
    gcc -c $(CFLAGS) -o $@ $<
sqlite3.o : sqlite3.c
    gcc -c -g -O2 -I../include -o $@ $^
    
md5.o : md5.c md5.h
    gcc -c $(CFLAGS) -o $@ $<
test/vs/test.sln
@@ -7,6 +7,8 @@
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "test_odbc", "test_odbc\test_odbc.vcproj", "{86C60EAD-F5B4-482B-9CC2-92A77323E759}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "test_mariadb", "test_mariadb\test_mariadb.vcproj", "{2A539BFA-B934-4192-A029-524593AEC6B8}"
EndProject
Global
    GlobalSection(SolutionConfigurationPlatforms) = preSolution
        Debug|Win32 = Debug|Win32
@@ -25,6 +27,10 @@
        {86C60EAD-F5B4-482B-9CC2-92A77323E759}.Debug|Win32.Build.0 = Debug|Win32
        {86C60EAD-F5B4-482B-9CC2-92A77323E759}.Release|Win32.ActiveCfg = Release|Win32
        {86C60EAD-F5B4-482B-9CC2-92A77323E759}.Release|Win32.Build.0 = Release|Win32
        {2A539BFA-B934-4192-A029-524593AEC6B8}.Debug|Win32.ActiveCfg = Debug|Win32
        {2A539BFA-B934-4192-A029-524593AEC6B8}.Debug|Win32.Build.0 = Debug|Win32
        {2A539BFA-B934-4192-A029-524593AEC6B8}.Release|Win32.ActiveCfg = Release|Win32
        {2A539BFA-B934-4192-A029-524593AEC6B8}.Release|Win32.Build.0 = Release|Win32
    EndGlobalSection
    GlobalSection(SolutionProperties) = preSolution
        HideSolutionNode = FALSE
test/vs/test_mariadb/test_mariadb.vcproj
New file
@@ -0,0 +1,225 @@
<?xml version="1.0" encoding="gb2312"?>
<VisualStudioProject
    ProjectType="Visual C++"
    Version="8.00"
    Name="test_mariadb"
    ProjectGUID="{2A539BFA-B934-4192-A029-524593AEC6B8}"
    RootNamespace="test_mariadb"
    Keyword="Win32Proj"
    >
    <Platforms>
        <Platform
            Name="Win32"
        />
    </Platforms>
    <ToolFiles>
    </ToolFiles>
    <Configurations>
        <Configuration
            Name="Debug|Win32"
            OutputDirectory="$(SolutionDir)$(ConfigurationName)"
            IntermediateDirectory="$(ConfigurationName)"
            ConfigurationType="1"
            CharacterSet="1"
            >
            <Tool
                Name="VCPreBuildEventTool"
            />
            <Tool
                Name="VCCustomBuildTool"
            />
            <Tool
                Name="VCXMLDataGeneratorTool"
            />
            <Tool
                Name="VCWebServiceProxyGeneratorTool"
            />
            <Tool
                Name="VCMIDLTool"
            />
            <Tool
                Name="VCCLCompilerTool"
                Optimization="0"
                AdditionalIncludeDirectories="D:\mariadb\include\mysql"
                PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS"
                MinimalRebuild="true"
                BasicRuntimeChecks="3"
                RuntimeLibrary="3"
                UsePrecompiledHeader="2"
                WarningLevel="3"
                DebugInformationFormat="4"
            />
            <Tool
                Name="VCManagedResourceCompilerTool"
            />
            <Tool
                Name="VCResourceCompilerTool"
            />
            <Tool
                Name="VCPreLinkEventTool"
            />
            <Tool
                Name="VCLinkerTool"
                AdditionalDependencies="libmariadb.lib ws2_32.lib"
                LinkIncremental="2"
                AdditionalLibraryDirectories="D:\mariadb\lib"
                GenerateDebugInformation="true"
                SubSystem="1"
                TargetMachine="1"
            />
            <Tool
                Name="VCALinkTool"
            />
            <Tool
                Name="VCManifestTool"
            />
            <Tool
                Name="VCXDCMakeTool"
            />
            <Tool
                Name="VCBscMakeTool"
            />
            <Tool
                Name="VCFxCopTool"
            />
            <Tool
                Name="VCAppVerifierTool"
            />
            <Tool
                Name="VCWebDeploymentTool"
            />
            <Tool
                Name="VCPostBuildEventTool"
            />
        </Configuration>
        <Configuration
            Name="Release|Win32"
            OutputDirectory="$(SolutionDir)$(ConfigurationName)"
            IntermediateDirectory="$(ConfigurationName)"
            ConfigurationType="1"
            CharacterSet="1"
            WholeProgramOptimization="1"
            >
            <Tool
                Name="VCPreBuildEventTool"
            />
            <Tool
                Name="VCCustomBuildTool"
            />
            <Tool
                Name="VCXMLDataGeneratorTool"
            />
            <Tool
                Name="VCWebServiceProxyGeneratorTool"
            />
            <Tool
                Name="VCMIDLTool"
            />
            <Tool
                Name="VCCLCompilerTool"
                AdditionalIncludeDirectories="D:\mariadb\include\mysql"
                PreprocessorDefinitions="WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS"
                RuntimeLibrary="2"
                UsePrecompiledHeader="2"
                WarningLevel="3"
                DebugInformationFormat="3"
            />
            <Tool
                Name="VCManagedResourceCompilerTool"
            />
            <Tool
                Name="VCResourceCompilerTool"
            />
            <Tool
                Name="VCPreLinkEventTool"
            />
            <Tool
                Name="VCLinkerTool"
                AdditionalDependencies="libmariadb.lib ws2_32.lib"
                LinkIncremental="1"
                AdditionalLibraryDirectories="D:\mariadb\lib"
                GenerateDebugInformation="true"
                SubSystem="1"
                OptimizeReferences="2"
                EnableCOMDATFolding="2"
                TargetMachine="1"
            />
            <Tool
                Name="VCALinkTool"
            />
            <Tool
                Name="VCManifestTool"
            />
            <Tool
                Name="VCXDCMakeTool"
            />
            <Tool
                Name="VCBscMakeTool"
            />
            <Tool
                Name="VCFxCopTool"
            />
            <Tool
                Name="VCAppVerifierTool"
            />
            <Tool
                Name="VCWebDeploymentTool"
            />
            <Tool
                Name="VCPostBuildEventTool"
            />
        </Configuration>
    </Configurations>
    <References>
    </References>
    <Files>
        <Filter
            Name="Ô´Îļþ"
            Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
            UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
            >
            <File
                RelativePath="..\..\stdafx.cpp"
                >
                <FileConfiguration
                    Name="Debug|Win32"
                    >
                    <Tool
                        Name="VCCLCompilerTool"
                        UsePrecompiledHeader="1"
                    />
                </FileConfiguration>
                <FileConfiguration
                    Name="Release|Win32"
                    >
                    <Tool
                        Name="VCCLCompilerTool"
                        UsePrecompiledHeader="1"
                    />
                </FileConfiguration>
            </File>
            <File
                RelativePath="..\..\TestMariaDB.cpp"
                >
            </File>
        </Filter>
        <Filter
            Name="Í·Îļþ"
            Filter="h;hpp;hxx;hm;inl;inc;xsd"
            UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
            >
            <File
                RelativePath="..\..\stdafx.h"
                >
            </File>
        </Filter>
        <Filter
            Name="×ÊÔ´Îļþ"
            Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav"
            UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
            >
        </Filter>
    </Files>
    <Globals>
    </Globals>
</VisualStudioProject>