znone
2021-03-25 9b81cdebba95f1b6e687191c630c1f8f6cf0df16
PostgreSQL: Support asynchronous operation
Fix bug in different versions of asio
7 files modified
1 files added
1113 ■■■■■ changed files
include/qtl_asio.hpp 76 ●●●●● patch | view | raw | blame | history
include/qtl_async.hpp 41 ●●●● patch | view | raw | blame | history
include/qtl_mysql.hpp 5 ●●●●● patch | view | raw | blame | history
include/qtl_postgres.hpp 753 ●●●●● patch | view | raw | blame | history
include/qtl_postgres_pool.hpp 26 ●●●●● patch | view | raw | blame | history
test/AsyncPostgres.cpp 150 ●●●●● patch | view | raw | blame | history
test/TestPostgres.cpp 44 ●●●●● patch | view | raw | blame | history
test/test_postgres.mak 18 ●●●●● patch | view | raw | blame | history
include/qtl_asio.hpp
@@ -1,7 +1,7 @@
#ifndef _QTL_ASIO_H_
#define _QTL_ASIO_H_
#include <qtl_async.hpp>
#include "qtl_async.hpp"
#include <asio/version.hpp>
#define ASIO_STANDALONE
#if ASIO_VERSION < 101200
@@ -84,6 +84,7 @@
                    else
                        handler(qtl::event::ef_exception);
                    _busying = false;
                    _timer.cancel();
                }));
                _busying = true;
            }
@@ -100,6 +101,7 @@
                        handler(qtl::event::ef_timeout);
                    else
                        handler(qtl::event::ef_exception);
                    _timer.cancel();
                    _busying = false;
                }));
                _busying = true;
@@ -197,8 +199,14 @@
inline ASIO_INITFN_RESULT_TYPE(OpenHandler, void(typename Connection::exception_type)) 
async_open(service& service, Connection& db, OpenHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<OpenHandler,
        void(typename Connection::exception_type)> init(std::forward<OpenHandler>(handler));
#else
    async_init_type<OpenHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.open(service, get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -207,8 +215,14 @@
inline ASIO_INITFN_RESULT_TYPE(CloseHandler, void()) 
async_close(Connection& db, CloseHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<CloseHandler,
        void()> init(std::forward<CloseHandler>(std::forward<CloseHandler>(handler)));
#else
    async_init_type<CloseHandler,
        void()> init(std::forward<CloseHandler>(handler));
#endif
    db.close(get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -217,8 +231,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))  
async_execute(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
#else
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(handler);
#endif
    db.execute(get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -227,8 +247,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))  
async_execute_direct(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
#else
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(handler);
#endif
    db.execute_direct(get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -237,8 +263,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t)) 
 async_insert(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
#else
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(handler);
#endif
    db.insert(get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -247,8 +279,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t)) 
 async_insert_direct(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
#else
    async_init_type<ExecuteHandler,
        void(typename Connection::exception_type, uint64_t)> init(handler);
#endif
    db.insert_direct(get_async_handler(init), std::forward<Args>(args)...);
    return init.result.get();
}
@@ -257,8 +295,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type)) 
 async_query(Connection& db, FinishHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query(std::forward<Args>(args)..., get_async_handler(init));
    return init.result.get();
}
@@ -267,8 +311,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type)) 
async_query_explicit(Connection& db, FinishHandler&& handler, Args&&... args)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query_explicit(std::forward<Args>(args)..., get_async_handler(init));
    return init.result.get();
}
@@ -277,8 +327,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi_with_params(Connection& db, A1&& a1, A2&& a2, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query_multi_with_params(std::forward<A1>(a1), std::forward<A2>(a2), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
    return init.result.get();
}
@@ -287,8 +343,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi_with_params(Connection& db, A1&& a1, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query_multi_with_params(std::forward<A1>(a1), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
    return init.result.get();
}
@@ -297,8 +359,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi(Connection& db, A1&& a1, A2&& a2, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query_multi(std::forward<A1>(a1), std::forward<A2>(a2), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
    return init.result.get();
}
@@ -307,8 +375,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi(Connection& db, A1&& a1, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
#if ASIO_VERSION < 101200
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
#else
    async_init_type<FinishHandler,
        void(typename Connection::exception_type)> init(handler);
#endif
    db.query_multi(std::forward<A1>(a1), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
    return init.result.get();
}
include/qtl_async.hpp
@@ -163,24 +163,14 @@
        pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) mutable {
            if(e)
            {
                handler(e, 0);
                command->close([command, handler](const typename T::exception_type& e) mutable {
                    if(e) handler(e, 0);
                command->close([command, e, handler](const typename T::exception_type& ae) mutable {
                    handler(e ? e : ae, 0);
                });
                return;
            }
            command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) mutable {
                if(e)
                {
                    handler(e, 0);
                    command->close([command, handler](const typename T::exception_type& e) mutable {
                        if(e) handler(e, 0);
                    });
                    return;
                }
                handler(typename T::exception_type(), affected);
                command->close([command, handler](const typename T::exception_type& e) mutable {
                    if(e) handler(e, 0);
                command->close([command, handler, e, affected](const typename T::exception_type& ae) mutable {
                    handler(e ? e : ae, affected);
                });
            });
        });
@@ -227,25 +217,18 @@
        pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) {
            if(e)
            {
                handler(e, 0);
                command->close([command, e, handler](const typename T::exception_type& ae) mutable {
                    handler(e ? e : ae, 0);
                });
            }
            else
            {
                command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) {
                    if(e)
                    {
                        handler(e, 0);
                    }
                    else if (affected > 0)
                    {
                        handler(typename T::exception_type(), command->insert_id());
                    }
                    else
                    {
                        handler(typename T::exception_type(), 0);
                    }
                    command->close([handler](const typename T::exception_type& e) {
                        handler(e, 0);
                    auto insert_id = 0;
                    if(!e && affected>0)
                        insert_id  = command->insert_id();
                    command->close([command, handler, e, insert_id](const typename T::exception_type& ae) mutable {
                        handler(e ? e : ae, insert_id);
                    });
                });
            }
include/qtl_mysql.hpp
@@ -3,6 +3,7 @@
#include <mysql.h>
#include <errmsg.h>
#include <time.h>
#include <memory.h>
#include <assert.h>
@@ -1187,7 +1188,7 @@
#if MARIADB_VERSION_ID >= 100000
int event_flags(int status) NOEXCEPT
inline int event_flags(int status) NOEXCEPT
{
    int flags = 0;
    if (status&MYSQL_WAIT_READ)
@@ -1199,7 +1200,7 @@
    return flags;
}
int mysql_status(int flags) NOEXCEPT
inline int mysql_status(int flags) NOEXCEPT
{
    int status = 0;
    if (flags&event::ef_read)
include/qtl_postgres.hpp
@@ -13,6 +13,7 @@
#include <algorithm>
#include <assert.h>
#include "qtl_common.hpp"
#include "qtl_async.hpp"
#define FRONTEND
@@ -26,7 +27,6 @@
extern "C"
{
#include <c.h>
#include <postgres.h>
#include <catalog/pg_type.h>
}
@@ -164,11 +164,11 @@
class error : public std::exception
{
public:
    error() : m_errmsg(nullptr) { }
    error() : m_errmsg() { }
    explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS)
    {
        PQsetErrorVerbosity(conn, verbosity);
        PQsetErrorContextVisibility(conn, show_context);
        //PQsetErrorVerbosity(conn, verbosity);
        //PQsetErrorContextVisibility(conn, show_context);
        const char* errmsg = PQerrorMessage(conn);
        if (errmsg) m_errmsg = errmsg;
        else m_errmsg.clear();
@@ -182,9 +182,19 @@
    }
    virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); }
    operator bool() const { return !m_errmsg.empty(); }
private:
protected:
    std::string m_errmsg;
};
class timeout : public error
{
public:
    timeout()
    {
        m_errmsg = "timeout";
    }
};
inline void verify_pgtypes_error(int ret)
@@ -992,7 +1002,7 @@
            if (end - data < size)
                throw std::overflow_error("insufficient data left in message");
            data = object_traits<T>::get(value, data, data + size);
            if (data >= end)
            if (data > end)
                throw std::overflow_error("insufficient data left in message");
            result.push_back(value);
        }
@@ -1494,6 +1504,17 @@
        }
    }
    template<ExecStatusType... Excepted>
    void verify_error(error& e)
    {
        if (m_res)
        {
            ExecStatusType got = status();
            if (!in<ExecStatusType, Excepted...>(got))
                e = error(m_res);
        }
    }
    void clear()
    {
        if (m_res)
@@ -1517,15 +1538,27 @@
     }
     base_statement(const base_statement&) = delete;
     base_statement(base_statement&& src) 
         : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res))
         : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name))
     {
     }
     base_statement& operator=(const base_statement&) = delete;
     base_statement& operator=(base_statement&& src)
     {
         if (this != &src)
         {
             close();
             m_conn = src.m_conn;
             m_binders = std::move(src.m_binders);
             m_res = std::move(src.m_res);
         }
         return *this;
     }
    result& get_result() { return m_res; }
    void close()
    {
        m_res=nullptr;
        m_res = nullptr;
    }
    uint64_t affetced_rows() const
@@ -1614,6 +1647,7 @@
protected:
    PGconn* m_conn;
    result m_res;
    std::string _name;
    std::vector<binder> m_binders;
    template<ExecStatusType... Excepted>
@@ -1624,6 +1658,13 @@
        else
            throw error(m_conn);
    }
    void finish(result& res)
    {
        while (res)
        {
            res = PQgetResult(m_conn);
        }
    }
};
class statement : public base_statement
@@ -1633,7 +1674,7 @@
    {
    }
    statement(const statement&) = delete;
    statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name))
    statement(statement&& src) : base_statement(std::move(src))
    {
    }
@@ -1761,17 +1802,6 @@
        finish(m_res);
        m_res.clear();
    }
private:
    std::string _name;
    void finish(result& res)
    {
        while (res)
        {
            res = PQgetResult(m_conn);
        }
    }
};
class base_database
@@ -1783,6 +1813,8 @@
    }
public:
    typedef postgres::error exception_type;
    base_database(const base_database&) = delete;
    base_database(base_database&& src)
    {
@@ -1796,8 +1828,8 @@
            PQfinish(m_conn);
    }
    base_database& operator==(const base_database&) = delete;
    base_database& operator==(base_database&& src)
    base_database& operator=(const base_database&) = delete;
    base_database& operator=(base_database&& src)
    {
        if (this != &src)
        {
@@ -1882,12 +1914,10 @@
            PQreset(m_conn);
    }
    bool is_alive()
    void close()
    {
    }
    PGPing ping()
    {
        PQfinish(m_conn);
        m_conn = nullptr;
    }
protected:
@@ -1920,7 +1950,7 @@
                        m_res.get_column_type(j));
                }
                qtl::bind_record(*this, std::forward<decltype(values)>(values));
                proc(values);
                qtl::detail::apply(proc, std::forward<decltype(values)>(values));
            }
        }
    }
@@ -1959,12 +1989,6 @@
        sprintf(port_text, "%u", port);
        m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password);
        return m_conn != nullptr && status() == CONNECTION_OK;
    }
    void close()
    {
        PQfinish(m_conn);
        m_conn = nullptr;
    }
    statement open_command(const char* query_text, size_t /*text_length*/)
@@ -2023,8 +2047,669 @@
        simple_execute("COMMIT");
    }
    bool is_alive()
    {
        qtl::postgres::result res(PQexec(m_conn, ""));
        return res && res.status() == PGRES_COMMAND_OK;
    }
};
inline int event_flags(PostgresPollingStatusType status)
{
    int flags = 0;
    if (status == PGRES_POLLING_READING)
        flags |= event::ef_read;
    else if (status == PGRES_POLLING_WRITING)
        flags |= event::ef_write;
    else if (status == PGRES_POLLING_FAILED)
        flags |= event::ef_exception;
    return flags;
}
class async_connection;
template<typename Handler>
inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler)
{
    int flushed = PQflush(conn);
    if (flushed < 0)
    {
        handler(error(conn));
        return;
    }
    if (flushed == 1)
    {
        event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout,
            [event, conn, timeout, handler](int flags) mutable {
            if (flags&qtl::event::ef_timeout)
            {
                handler(postgres::timeout());
                return;
            }
            if (flags&qtl::event::ef_read)
            {
                if (!PQconsumeInput(conn))
                {
                    handler(error(conn));
                    return;
                }
            }
            if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception))
                async_wait(event, conn, timeout, handler);
        });
    }
    else
    {
        event->set_io_handler(qtl::event::ef_read, 10,
            [event, conn, timeout, handler](int flags) mutable {
            if (flags&qtl::event::ef_timeout)
            {
                handler(postgres::timeout());
            }
            else if (flags&(qtl::event::ef_read | qtl::event::ef_exception))
            {
                if (PQconsumeInput(conn))
                {
                    if (!PQisBusy(conn))
                        handler(postgres::error());
                    else
                        async_wait(event, conn, timeout, handler);
                }
                else
                {
                    handler(postgres::error(conn));
                }
            }
            else
            {
                handler(postgres::error(conn));
            }
        });
    }
}
class async_statement : public base_statement
{
public:
    async_statement(async_connection& db);
    async_statement(async_statement&& src)
        : base_statement(std::move(src)), m_timeout(2)
    {
        m_event = src.m_event;
        m_timeout = src.m_timeout;
        src.m_event = nullptr;
    }
    async_statement& operator=(async_statement&& src)
    {
        if (this != &src)
        {
            base_statement::operator =(std::move(src));
            m_event = src.m_event;
            m_timeout = src.m_timeout;
            src.m_event = nullptr;
        }
        return *this;
    }
    ~async_statement()
    {
        close();
    }
    /*
        Handler defiens as:
        void handler(const qtl::mysql::error& e);
     */
    template<typename Handler>
    void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr)
    {
        _name.resize(sizeof(intptr_t) * 2 + 1);
        int n = sprintf(const_cast<char*>(_name.data()), "q%p", this);
        _name.resize(n);
        std::transform(_name.begin(), _name.end(), _name.begin(), tolower);
        if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes))
        {
            async_wait([this, handler](error e) mutable {
                if (!e)
                {
                    m_res = PQgetResult(m_conn);
                    if (m_res)
                    {
                        m_res.verify_error<PGRES_COMMAND_OK>(e);
                        while(m_res)
                            m_res = PQgetResult(m_conn);
                    }
                }
                handler(e);
            });
        }
        else
        {
            _name.clear();
            handler(error(m_conn));
        }
    }
    template<typename Handler, typename... Types>
    void open(Handler&& handler, const char* command)
    {
        auto binder_list = make_binder_list(Types()...);
        std::array<Oid, sizeof...(Types)> types;
        std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) {
            return b.type();
        });
        open(std::forward<Handler>(handler), command, types.size(), types.data());
    }
    void close()
    {
        while (m_res)
        {
            m_res = PQgetResult(m_conn);
        }
        if (!_name.empty())
        {
            std::ostringstream oss;
            oss << "DEALLOCATE " << _name << ";";
            result res = PQexec(m_conn, oss.str().data());
            error e;
            res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
            finish(res);
            if(e) throw e;
        }
        base_statement::close();
    }
    template<typename Handler>
    void close(Handler&& handler)
    {
        while (m_res)
        {
            if(PQisBusy(m_conn))
            {
                async_wait([this, handler](const error& e) mutable {
                    close(handler);
                });
            }
            else
            {
                m_res = PQgetResult(m_conn);
            }
        }
        if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK)
        {
            std::ostringstream oss;
            oss << "DEALLOCATE " << _name << ";";
            bool ok = PQsendQuery(m_conn, oss.str().data());
            if (ok)
            {
                async_wait([this, handler](postgres::error e) mutable {
                    if (PQstatus(m_conn) == CONNECTION_OK)
                    {
                        result res(PQgetResult(m_conn));
                        if (res)
                            res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
                        if (!e) _name.clear();
                        finish(res);
                        handler(e);
                    }
                    else
                    {
                        _name.clear();
                        handler(error());
                    }
                });
            }
            else
            {
                handler(error(m_conn));
            }
        }
        else
        {
            _name.clear();
        }
    }
    /*
        ExecuteHandler defiens as:
        void handler(const qtl::mysql::error& e, uint64_t affected);
     */
    template<typename ExecuteHandler>
    void execute(ExecuteHandler&& handler)
    {
        if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) &&
            PQsetSingleRowMode(m_conn))
        {
            async_wait([this, handler](error e) {
                if (!e)
                {
                    m_res = PQgetResult(m_conn);
                    m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
                    finish(m_res);
                }
                handler(e);
            });
        }
        else
        {
            handler(error(m_conn));
        }
    }
    template<typename Types, typename Handler>
    void execute(const Types& params, Handler&& handler)
    {
        const size_t count = qtl::params_binder<statement, Types>::size;
        if (count > 0)
        {
            m_binders.resize(count);
            qtl::bind_params(*this, params);
            std::array<const char*, count> values;
            std::array<int, count> lengths;
            std::array<int, count> formats;
            for (size_t i = 0; i != m_binders.size(); i++)
            {
                values[i] = m_binders[i].value();
                lengths[i] = static_cast<int>(m_binders[i].length());
                formats[i] = 1;
            }
            if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1))
            {
                handler(error(m_conn), 0);
                return;
            }
        }
        else
        {
            if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1))
            {
                handler(error(m_conn), 0);
                return;
            }
        }
        if (!PQsetSingleRowMode(m_conn))
        {
            handler(error(m_conn), 0);
            return;
        }
        if (PQisBusy(m_conn))
        {
            async_wait([this, handler](error e) mutable {
                if (!e)
                {
                    m_res = PQgetResult(m_conn);
                    m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
                    int64_t affected = m_res.affected_rows();
                    finish(m_res);
                    handler(e, affected);
                }
                else
                {
                    handler(e, 0);
                }
            });
        }
    }
    template<typename Types, typename RowHandler, typename FinishHandler>
    void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
    {
        if (m_res)
        {
            ExecStatusType status = m_res.status();
            if (status == PGRES_SINGLE_TUPLE)
            {
                int count = m_res.get_column_count();
                if (count > 0)
                {
                    m_binders.resize(count);
                    for (int i = 0; i != count; i++)
                    {
                        m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i),
                            m_res.get_column_type(i));
                    }
                    qtl::bind_record(*this, std::forward<Types>(values));
                }
                row_handler();
                if (PQisBusy(m_conn))
                {
                    async_wait([this, &values, row_handler, finish_handler](const error& e) {
                        if (e)
                        {
                            finish_handler(e);
                        }
                        else
                        {
                            m_res = PQgetResult(m_conn);
                            fetch(std::forward<Types>(values), row_handler, finish_handler);
                        }
                    });
                }
                else
                {
                    m_res = PQgetResult(m_conn);
                    fetch(std::forward<Types>(values), row_handler, finish_handler);
                }
            }
            else
            {
                error e;
                m_res.verify_error<PGRES_TUPLES_OK>(e);
                finish_handler(e);
            }
        }
        else
        {
            finish_handler(error());
        }
    }
    template<typename Handler>
    void next_result(Handler&& handler)
    {
        async_wait([this, handler](const error& e) {
            if (e)
            {
                handler(e);
            }
            else
            {
                m_res = PQgetResult(m_conn);
                handler(error());
            }
        });
    }
private:
    event* m_event;
    int m_timeout;
    template<typename Handler>
    void async_wait(Handler&& handler)
    {
        qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler));
    }
};
class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement>
{
public:
    async_connection() : m_connect_timeout(2), m_query_timeout(2)
    {
    }
    async_connection(async_connection&& src)
        : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout)
    {
    }
    async_connection& operator=(async_connection&& src)
    {
        if (this != &src)
        {
            base_database::operator=(std::move(src));
            m_connect_timeout = src.m_connect_timeout;
            m_query_timeout = src.m_query_timeout;
        }
        return *this;
    }
    /*
        OpenHandler defines as:
            void handler(const qtl::postgres::error& e) NOEXCEPT;
    */
    template<typename EventLoop, typename OpenHandler>
    void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false)
    {
        std::vector<const char*> keywords;
        std::vector<const char*> values;
        keywords.reserve(params.size());
        values.reserve(params.size());
        for (auto& param : params)
        {
            keywords.push_back(param.first.data());
            values.push_back(param.second.data());
        }
        keywords.push_back(nullptr);
        values.push_back(nullptr);
        m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname);
        if (m_conn == nullptr)
            throw std::bad_alloc();
        if (status() == CONNECTION_BAD)
        {
            handler(error(m_conn));
            return;
        }
        if (PQsetnonblocking(m_conn, true)!=0)
            handler(error(m_conn));
        get_options();
        bind(ev);
        wait_connect(std::forward<OpenHandler>(handler));
    }
    template<typename EventLoop, typename OpenHandler>
    void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo)
    {
        m_conn = PQconnectStart(conninfo);
        if (m_conn == nullptr)
            throw std::bad_alloc();
        if (status() == CONNECTION_BAD)
        {
            handler(error(m_conn));
            return;
        }
        PQsetnonblocking(m_conn, true);
        get_options();
        bind(ev);
        wait_connect(std::forward<OpenHandler>(handler));
    }
    template<typename OpenHandler>
    void reset(OpenHandler&& handler)
    {
        PQresetStart(m_conn);
        wait_reset(std::forward<OpenHandler>(handler));
    }
    /*
        Handler defines as:
            void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
    */
    template<typename ExecuteHandler>
    void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
    {
        bool ok = PQsendQuery(m_conn, query_text);
        if (ok)
        {
            async_wait([this, handler](postgres::error e) mutable {
                result res(PQgetResult(m_conn));
                res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
                uint64_t affected = res.affected_rows();
                handler(e, affected);
                while (res)
                    res = PQgetResult(m_conn);
            });
        }
        else
        {
            handler(error(m_conn), 0);
        }
    }
    template<typename Handler>
    void auto_commit(Handler&& handler, bool on) NOEXCEPT
    {
        simple_execute(std::forward<Handler>(handler),
            on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF");
    }
    template<typename Handler>
    void begin_transaction(Handler&& handler) NOEXCEPT
    {
        simple_execute(std::forward<Handler>(handler), "BEGIN");
    }
    template<typename Handler>
    void rollback(Handler&& handler) NOEXCEPT
    {
        simple_execute(std::forward<Handler>(handler), "ROLLBACK");
    }
    template<typename Handler>
    void commit(Handler&& handler) NOEXCEPT
    {
        simple_execute(std::forward<Handler>(handler), "COMMIT");
    }
    /*
        ResultHandler defines as:
            void result_handler(const qtl::postgres::error& e) NOEXCEPT;
    */
    template<typename RowHandler, typename ResultHandler>
    void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
    {
        bool ok = PQsendQuery(m_conn, query);
        if (ok)
        {
            async_wait([this, row_handler, result_handler](postgres::error e) mutable {
                result res(PQgetResult(m_conn));
                res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
                if (e)
                {
                    result_handler(e, 0);
                    return;
                }
                uint64_t affected = res.affected_rows();
                while (res && res.status() == PGRES_TUPLES_OK)
                {
                    simple_statment stmt(*this, std::move(res));
                    stmt.fetch_all(row_handler);
                    res = PQgetResult(m_conn);
                }
                result_handler(e, affected);
            });
        }
        else
        {
            result_handler(error(m_conn), 0);
        }
    }
    template<typename Handler>
    void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler)
    {
        std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this);
        stmt->open([stmt, handler](const postgres::error& e) mutable {
            handler(e, stmt);
        }, query_text, 0);
    }
    template<typename Handler>
    bool is_alive(Handler&& handler) NOEXCEPT
    {
        simple_execute(std::forward<Handler>(handler), "");
    }
    socket_type socket() const NOEXCEPT { return PQsocket(m_conn); }
    int connect_timeout() const { return m_connect_timeout; }
    void connect_timeout(int timeout) { m_connect_timeout = timeout; }
    int query_timeout() const { return m_query_timeout; }
    void query_timeout(int timeout) { m_query_timeout = timeout; }
private:
    int m_connect_timeout;
    int m_query_timeout;
    void get_options()
    {
        PQconninfoOption* options = PQconninfo(m_conn);
        m_connect_timeout = 2;
        for (PQconninfoOption* option = options; option; option++)
        {
            if (strcmp(option->keyword, "connect_timeout") == 0)
            {
                if (option->val)
                    m_connect_timeout = atoi(option->val);
                break;
            }
        }
        PQconninfoFree(options);
    }
    template<typename OpenHandler>
    void wait_connect(OpenHandler&& handler) NOEXCEPT
    {
        PostgresPollingStatusType status = PQconnectPoll(m_conn);
        switch (status)
        {
        case PGRES_POLLING_READING:
        case PGRES_POLLING_WRITING:
            m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
                [this, handler](int flags) mutable {
                if (flags&event::ef_timeout)
                {
                    handler(postgres::timeout());
                }
                else if(flags&(event::ef_read|event::ef_write | event::ef_exception))
                    wait_connect(std::forward<OpenHandler>(handler));
            });
            break;
        case PGRES_POLLING_FAILED:
            handler(postgres::error(handle()));
            break;
        case PGRES_POLLING_OK:
            //PQsetnonblocking(m_conn, true);
            handler(postgres::error());
        }
    }
    template<typename OpenHandler>
    void wait_reset(OpenHandler&& handler) NOEXCEPT
    {
        PostgresPollingStatusType status = PQresetPoll(m_conn);
        switch (status)
        {
        case PGRES_POLLING_READING:
        case PGRES_POLLING_WRITING:
            m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
                [this, handler](int flags) mutable {
                if (flags&event::ef_timeout)
                {
                    handler(postgres::timeout());
                }
                else if (flags&(event::ef_read | event::ef_write | event::ef_exception))
                    wait_reset(std::forward<OpenHandler>(handler));
            });
            break;
        case PGRES_POLLING_FAILED:
            handler(postgres::error(m_conn));
            break;
        case PGRES_POLLING_OK:
            handler(postgres::error());
        }
    }
    template<typename Handler>
    void async_wait(Handler&& handler)
    {
        qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler));
    }
};
inline async_statement::async_statement(async_connection& db)
    : base_statement(static_cast<base_database&>(db))
{
    m_event = db.event();
    m_timeout = db.query_timeout();
}
typedef qtl::transaction<database> transaction;
template<typename Record>
include/qtl_postgres_pool.hpp
@@ -38,6 +38,32 @@
    std::string m_password;
};
template<typename EventLoop>
class async_pool : public qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection>
{
    typedef qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection> base_class;
public:
    async_pool(EventLoop& ev) : base_class(ev) { }
    virtual ~async_pool() { }
    template<typename Handler>
    void new_connection(EventLoop& ev, Handler&& handler) throw()
    {
        async_connection* db = new async_connection;
        db->open(ev, [this, handler, db](const postgres::error& e) mutable {
            if (e)
            {
                delete db;
                db = nullptr;
            }
            handler(e, db);
        }, m_params);
    }
protected:
    std::map<std::string, std::string> m_params;
};
}
}
test/AsyncPostgres.cpp
New file
@@ -0,0 +1,150 @@
#include "stdafx.h"
#include "../include/qtl_postgres.hpp"
#include <vector>
#include <thread>
#include <system_error>
#include <time.h>
#include <limits.h>
#include "../include/qtl_postgres_pool.hpp"
#include "../include/qtl_asio.hpp"
using namespace qtl::postgres;
void LogError(const error& e)
{
    fprintf(stderr, "PostgreSQL Error: %s\n", e.what());
}
const char postgres_server[] = "localhost";
const char postgres_user[] = "postgres";
const char postgres_password[] = "111111";
const char postgres_database[] = "test";
qtl::asio::service service;
void SimpleTest()
{
    async_connection connection;
    service.reset();
    std::map<std::string, std::string> params;
    params["host"] = postgres_server;
    params["dbname"] = postgres_database;
    params["user"] = postgres_user;
    params["password"] = postgres_password;
    connection.open(service, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            service.stop();
        }
        else
        {
            printf("Connect to PostgreSQL ok.\n");
            connection.simple_query("select id, name from test", [](const std::string& id, const std::string& name) {
                printf("%s\t%s\n", id.data(), name.data());
                return true;
            }, [&connection](const error& e, size_t row_count) {
                if (e)
                    LogError(e);
                else
                    printf("Total %lu rows.\n", row_count);
                connection.close();
            });
        }
    }, params);
    service.run();
}
void insert(async_connection& connection, int next)
{
    qtl::asio::async_execute(connection, [&connection, next](const error& e, uint64_t affected) mutable {
        if (e)
        {
            LogError(e);
            service.stop();
        }
        else
        {
            if (next)
            {
                --next;
                asio::post(service.context(), [&connection, next]() {
                    insert(connection, next);
                });
            }
            else
            {
                service.stop();
            }
        }
    }, "insert into test(name, createtime) values($1, LOCALTIMESTAMP)", 0, "test name");
}
void ExecuteTest()
{
    async_connection connection;
    service.reset();
    std::map<std::string, std::string> params;
    params["host"] = postgres_server;
    params["dbname"] = postgres_database;
    params["user"] = postgres_user;
    params["password"] = postgres_password;
    qtl::asio::async_open(service, connection, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            service.stop();
        }
        else
        {
            printf("Connect to PostgreSQL ok.\n");
            insert(connection, 10);
        }
    }, params);
    service.run();
};
void QueryTest()
{
    async_connection connection;
    service.reset();
    std::map<std::string, std::string> params;
    params["host"] = postgres_server;
    params["dbname"] = postgres_database;
    params["user"] = postgres_user;
    params["password"] = postgres_password;
    connection.open(service, [&connection](const error& e) {
        if (e)
        {
            LogError(e);
            service.stop();
        }
        else
        {
            printf("Connect to PostgreSQL ok.\n");
            connection.query("select id, name, CreateTime from test",
                [](int32_t id, const std::string& name, const qtl::postgres::timestamp& create_time) {
                printf("%d\t%s\t%s\n", id, name.data(), create_time.to_string().data());
            }, [&connection](const error& e) {
                printf("query has completed.\n");
                if (e)
                    LogError(e);
                connection.close();
            });
        }
    }, params);
    service.run();
}
int main(int argc, char* argv[])
{
    ExecuteTest();
    SimpleTest();
    QueryTest();
    return 0;
}
test/TestPostgres.cpp
@@ -73,6 +73,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_select()
@@ -83,7 +87,7 @@
    try
    {
        db.query("select * from test where id=$1", 0, id,
            [](const qtl::indicator<int32_t>& id, const std::string& name, const qtl::postgres::timestamp& create_time) {
            [](const qtl::indicator<int32_t>& id, const std::string& name, const qtl::postgres::timestamp& create_time, const std::vector<int32_t>& v, const std::tuple<int, int>& pt) {
            printf("ID=\"%d\", Name=\"%s\"\n",
                id.data, name.data());
        });
@@ -101,6 +105,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_insert()
@@ -110,10 +118,15 @@
    try
    {
        db.query_first("insert into test(Name, CreateTime) values($1, LOCALTIMESTAMP) returning ID",
            "test_user", id);
        int32_t va[] = { 200, 300, 400 };
        db.query_first("insert into test(Name, CreateTime, va, percent) values($1, LOCALTIMESTAMP, $2, $3) returning ID",
            std::forward_as_tuple("test_user", va, std::make_pair(11, 22)), id);
    }
    catch (qtl::postgres::error& e)
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
@@ -136,6 +149,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_update()
@@ -152,7 +169,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    TEST_ASSERT_MSG(id > 0, "insert failture.");
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_clear()
@@ -165,6 +185,10 @@
        db.simple_execute("delete from test");
    }
    catch (qtl::postgres::error& e)
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
@@ -185,6 +209,10 @@
        }
    }
    catch (qtl::postgres::error& e)
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::exception& e)
    {
        ASSERT_EXCEPTION(e);
    }
@@ -233,6 +261,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::bad_cast& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_select_blob()
@@ -271,6 +303,10 @@
    {
        ASSERT_EXCEPTION(e);
    }
    catch (std::bad_cast& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestPostgres::test_any()
test/test_postgres.mak
@@ -1,24 +1,30 @@
TARGET=test_postgres
TARGET=test_postgres async_postgres
CC=g++
PCH_HEADER=stdafx.h
PCH=stdafx.h.gch
OBJ=TestPostgres.o md5.o
CFLAGS=-g -D_DEBUG -O2-I/usr/include -I/usr/local/include -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server )
OBJ=TestPostgres.o AsyncPostgres.o md5.o
CFLAGS=-g -D_DEBUG -O2 -I/usr/include -I/usr/local/include -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server )
CXXFLAGS= -I../include -std=c++11
LDFLAGS= -L$(shell pg_config --libdir) -lcpptest -lpq -lpgtypes
LDFLAGS= -L$(shell pg_config --libdir) -pthread -lcpptest -lpq -lpgtypes
all : $(TARGET)
$(PCH) : $(PCH_HEADER)
    $(CC) $(CFLAGS) $(CXXFLAGS) -x c++-header -o $@ $<
TestPostgres.o : TestPostgres.cpp $(PCH)
TestPostgres.o : TestPostgres.cpp TestPostgres.h $(PCH)
    $(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ $<
AsyncPostgres.o : AsyncPostgres.cpp $(PCH)
    $(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ $< 
    
md5.o : md5.c md5.h
    gcc -c $(CFLAGS) -o $@ $<
$(TARGET) : $(OBJ)
test_postgres : TestPostgres.o md5.o
    libtool --tag=CXX --mode=link $(CC) $(LDFLAGS) -o $@ $^
async_postgres : AsyncPostgres.o md5.o
    libtool --tag=CXX --mode=link $(CC) $(LDFLAGS) -o $@ $^
clean: