From 9b81cdebba95f1b6e687191c630c1f8f6cf0df16 Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Thu, 25 Mar 2021 12:44:30 +0000
Subject: [PATCH] PostgreSQL: Support asynchronous operation Fix bug in different versions of asio
---
test/TestPostgres.cpp | 44 ++
test/AsyncPostgres.cpp | 150 +++++++
include/qtl_mysql.hpp | 5
include/qtl_async.hpp | 41 -
include/qtl_postgres.hpp | 753 +++++++++++++++++++++++++++++++++++++-
include/qtl_postgres_pool.hpp | 26 +
include/qtl_asio.hpp | 76 +++
test/test_postgres.mak | 18
8 files changed, 1,037 insertions(+), 76 deletions(-)
diff --git a/include/qtl_asio.hpp b/include/qtl_asio.hpp
index ee8fae2..9bfe86a 100644
--- a/include/qtl_asio.hpp
+++ b/include/qtl_asio.hpp
@@ -1,7 +1,7 @@
#ifndef _QTL_ASIO_H_
#define _QTL_ASIO_H_
-#include <qtl_async.hpp>
+#include "qtl_async.hpp"
#include <asio/version.hpp>
#define ASIO_STANDALONE
#if ASIO_VERSION < 101200
@@ -84,6 +84,7 @@
else
handler(qtl::event::ef_exception);
_busying = false;
+ _timer.cancel();
}));
_busying = true;
}
@@ -100,6 +101,7 @@
handler(qtl::event::ef_timeout);
else
handler(qtl::event::ef_exception);
+ _timer.cancel();
_busying = false;
}));
_busying = true;
@@ -197,8 +199,14 @@
inline ASIO_INITFN_RESULT_TYPE(OpenHandler, void(typename Connection::exception_type))
async_open(service& service, Connection& db, OpenHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<OpenHandler,
void(typename Connection::exception_type)> init(std::forward<OpenHandler>(handler));
+#else
+ async_init_type<OpenHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.open(service, get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -207,8 +215,14 @@
inline ASIO_INITFN_RESULT_TYPE(CloseHandler, void())
async_close(Connection& db, CloseHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
+ async_init_type<CloseHandler,
+ void()> init(std::forward<CloseHandler>(std::forward<CloseHandler>(handler)));
+#else
async_init_type<CloseHandler,
void()> init(std::forward<CloseHandler>(handler));
+#endif
+
db.close(get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -217,8 +231,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))
async_execute(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<ExecuteHandler,
void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
+#else
+ async_init_type<ExecuteHandler,
+ void(typename Connection::exception_type, uint64_t)> init(handler);
+#endif
+
db.execute(get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -227,8 +247,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))
async_execute_direct(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<ExecuteHandler,
void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
+#else
+ async_init_type<ExecuteHandler,
+ void(typename Connection::exception_type, uint64_t)> init(handler);
+#endif
+
db.execute_direct(get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -237,8 +263,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))
async_insert(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<ExecuteHandler,
void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
+#else
+ async_init_type<ExecuteHandler,
+ void(typename Connection::exception_type, uint64_t)> init(handler);
+#endif
+
db.insert(get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -247,8 +279,14 @@
inline ASIO_INITFN_RESULT_TYPE(ExecuteHandler, void(typename Connection::exception_type, uint64_t))
async_insert_direct(Connection& db, ExecuteHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<ExecuteHandler,
void(typename Connection::exception_type, uint64_t)> init(std::forward<ExecuteHandler>(handler));
+#else
+ async_init_type<ExecuteHandler,
+ void(typename Connection::exception_type, uint64_t)> init(handler);
+#endif
+
db.insert_direct(get_async_handler(init), std::forward<Args>(args)...);
return init.result.get();
}
@@ -257,8 +295,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query(Connection& db, FinishHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query(std::forward<Args>(args)..., get_async_handler(init));
return init.result.get();
}
@@ -267,8 +311,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_explicit(Connection& db, FinishHandler&& handler, Args&&... args)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query_explicit(std::forward<Args>(args)..., get_async_handler(init));
return init.result.get();
}
@@ -277,8 +327,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi_with_params(Connection& db, A1&& a1, A2&& a2, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query_multi_with_params(std::forward<A1>(a1), std::forward<A2>(a2), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
return init.result.get();
}
@@ -287,8 +343,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi_with_params(Connection& db, A1&& a1, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query_multi_with_params(std::forward<A1>(a1), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
return init.result.get();
}
@@ -297,8 +359,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi(Connection& db, A1&& a1, A2&& a2, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query_multi(std::forward<A1>(a1), std::forward<A2>(a2), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
return init.result.get();
}
@@ -307,8 +375,14 @@
inline ASIO_INITFN_RESULT_TYPE(FinishHandler, void(typename Connection::exception_type))
async_query_multi(Connection& db, A1&& a1, FinishHandler&& handler, RowHandlers&&... row_handlers)
{
+#if ASIO_VERSION < 101200
async_init_type<FinishHandler,
void(typename Connection::exception_type)> init(std::forward<FinishHandler>(handler));
+#else
+ async_init_type<FinishHandler,
+ void(typename Connection::exception_type)> init(handler);
+#endif
+
db.query_multi(std::forward<A1>(a1), get_async_handler(init), std::forward<RowHandlers>(row_handlers)...);
return init.result.get();
}
diff --git a/include/qtl_async.hpp b/include/qtl_async.hpp
index ec877ea..eeb98d5 100644
--- a/include/qtl_async.hpp
+++ b/include/qtl_async.hpp
@@ -163,24 +163,14 @@
pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) mutable {
if(e)
{
- handler(e, 0);
- command->close([command, handler](const typename T::exception_type& e) mutable {
- if(e) handler(e, 0);
+ command->close([command, e, handler](const typename T::exception_type& ae) mutable {
+ handler(e ? e : ae, 0);
});
return;
}
command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) mutable {
- if(e)
- {
- handler(e, 0);
- command->close([command, handler](const typename T::exception_type& e) mutable {
- if(e) handler(e, 0);
- });
- return;
- }
- handler(typename T::exception_type(), affected);
- command->close([command, handler](const typename T::exception_type& e) mutable {
- if(e) handler(e, 0);
+ command->close([command, handler, e, affected](const typename T::exception_type& ae) mutable {
+ handler(e ? e : ae, affected);
});
});
});
@@ -227,25 +217,18 @@
pThis->open_command(query_text, text_length, [handler, params](const typename T::exception_type& e, std::shared_ptr<Command>& command) {
if(e)
{
- handler(e, 0);
+ command->close([command, e, handler](const typename T::exception_type& ae) mutable {
+ handler(e ? e : ae, 0);
+ });
}
else
{
command->execute(params, [command, handler](const typename T::exception_type& e, uint64_t affected) {
- if(e)
- {
- handler(e, 0);
- }
- else if (affected > 0)
- {
- handler(typename T::exception_type(), command->insert_id());
- }
- else
- {
- handler(typename T::exception_type(), 0);
- }
- command->close([handler](const typename T::exception_type& e) {
- handler(e, 0);
+ auto insert_id = 0;
+ if(!e && affected>0)
+ insert_id = command->insert_id();
+ command->close([command, handler, e, insert_id](const typename T::exception_type& ae) mutable {
+ handler(e ? e : ae, insert_id);
});
});
}
diff --git a/include/qtl_mysql.hpp b/include/qtl_mysql.hpp
index a8ffb34..1ab6929 100644
--- a/include/qtl_mysql.hpp
+++ b/include/qtl_mysql.hpp
@@ -3,6 +3,7 @@
#include <mysql.h>
#include <errmsg.h>
+
#include <time.h>
#include <memory.h>
#include <assert.h>
@@ -1187,7 +1188,7 @@
#if MARIADB_VERSION_ID >= 100000
-int event_flags(int status) NOEXCEPT
+inline int event_flags(int status) NOEXCEPT
{
int flags = 0;
if (status&MYSQL_WAIT_READ)
@@ -1199,7 +1200,7 @@
return flags;
}
-int mysql_status(int flags) NOEXCEPT
+inline int mysql_status(int flags) NOEXCEPT
{
int status = 0;
if (flags&event::ef_read)
diff --git a/include/qtl_postgres.hpp b/include/qtl_postgres.hpp
index 4bc2148..3418c22 100644
--- a/include/qtl_postgres.hpp
+++ b/include/qtl_postgres.hpp
@@ -13,6 +13,7 @@
#include <algorithm>
#include <assert.h>
#include "qtl_common.hpp"
+#include "qtl_async.hpp"
#define FRONTEND
@@ -26,7 +27,6 @@
extern "C"
{
-#include <c.h>
#include <postgres.h>
#include <catalog/pg_type.h>
}
@@ -164,11 +164,11 @@
class error : public std::exception
{
public:
- error() : m_errmsg(nullptr) { }
+ error() : m_errmsg() { }
explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS)
{
- PQsetErrorVerbosity(conn, verbosity);
- PQsetErrorContextVisibility(conn, show_context);
+ //PQsetErrorVerbosity(conn, verbosity);
+ //PQsetErrorContextVisibility(conn, show_context);
const char* errmsg = PQerrorMessage(conn);
if (errmsg) m_errmsg = errmsg;
else m_errmsg.clear();
@@ -182,9 +182,19 @@
}
virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); }
+ operator bool() const { return !m_errmsg.empty(); }
-private:
+protected:
std::string m_errmsg;
+};
+
+class timeout : public error
+{
+public:
+ timeout()
+ {
+ m_errmsg = "timeout";
+ }
};
inline void verify_pgtypes_error(int ret)
@@ -992,7 +1002,7 @@
if (end - data < size)
throw std::overflow_error("insufficient data left in message");
data = object_traits<T>::get(value, data, data + size);
- if (data >= end)
+ if (data > end)
throw std::overflow_error("insufficient data left in message");
result.push_back(value);
}
@@ -1494,6 +1504,17 @@
}
}
+ template<ExecStatusType... Excepted>
+ void verify_error(error& e)
+ {
+ if (m_res)
+ {
+ ExecStatusType got = status();
+ if (!in<ExecStatusType, Excepted...>(got))
+ e = error(m_res);
+ }
+ }
+
void clear()
{
if (m_res)
@@ -1517,15 +1538,27 @@
}
base_statement(const base_statement&) = delete;
base_statement(base_statement&& src)
- : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res))
+ : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name))
{
+ }
+ base_statement& operator=(const base_statement&) = delete;
+ base_statement& operator=(base_statement&& src)
+ {
+ if (this != &src)
+ {
+ close();
+ m_conn = src.m_conn;
+ m_binders = std::move(src.m_binders);
+ m_res = std::move(src.m_res);
+ }
+ return *this;
}
result& get_result() { return m_res; }
void close()
{
- m_res=nullptr;
+ m_res = nullptr;
}
uint64_t affetced_rows() const
@@ -1614,6 +1647,7 @@
protected:
PGconn* m_conn;
result m_res;
+ std::string _name;
std::vector<binder> m_binders;
template<ExecStatusType... Excepted>
@@ -1624,6 +1658,13 @@
else
throw error(m_conn);
}
+ void finish(result& res)
+ {
+ while (res)
+ {
+ res = PQgetResult(m_conn);
+ }
+ }
};
class statement : public base_statement
@@ -1633,7 +1674,7 @@
{
}
statement(const statement&) = delete;
- statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name))
+ statement(statement&& src) : base_statement(std::move(src))
{
}
@@ -1761,17 +1802,6 @@
finish(m_res);
m_res.clear();
}
-
-private:
- std::string _name;
-
- void finish(result& res)
- {
- while (res)
- {
- res = PQgetResult(m_conn);
- }
- }
};
class base_database
@@ -1783,6 +1813,8 @@
}
public:
+ typedef postgres::error exception_type;
+
base_database(const base_database&) = delete;
base_database(base_database&& src)
{
@@ -1796,8 +1828,8 @@
PQfinish(m_conn);
}
- base_database& operator==(const base_database&) = delete;
- base_database& operator==(base_database&& src)
+ base_database& operator=(const base_database&) = delete;
+ base_database& operator=(base_database&& src)
{
if (this != &src)
{
@@ -1882,12 +1914,10 @@
PQreset(m_conn);
}
- bool is_alive()
+ void close()
{
- }
-
- PGPing ping()
- {
+ PQfinish(m_conn);
+ m_conn = nullptr;
}
protected:
@@ -1920,7 +1950,7 @@
m_res.get_column_type(j));
}
qtl::bind_record(*this, std::forward<decltype(values)>(values));
- proc(values);
+ qtl::detail::apply(proc, std::forward<decltype(values)>(values));
}
}
}
@@ -1959,12 +1989,6 @@
sprintf(port_text, "%u", port);
m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password);
return m_conn != nullptr && status() == CONNECTION_OK;
- }
-
- void close()
- {
- PQfinish(m_conn);
- m_conn = nullptr;
}
statement open_command(const char* query_text, size_t /*text_length*/)
@@ -2023,8 +2047,669 @@
simple_execute("COMMIT");
}
+ bool is_alive()
+ {
+ qtl::postgres::result res(PQexec(m_conn, ""));
+ return res && res.status() == PGRES_COMMAND_OK;
+ }
+
};
+inline int event_flags(PostgresPollingStatusType status)
+{
+ int flags = 0;
+ if (status == PGRES_POLLING_READING)
+ flags |= event::ef_read;
+ else if (status == PGRES_POLLING_WRITING)
+ flags |= event::ef_write;
+ else if (status == PGRES_POLLING_FAILED)
+ flags |= event::ef_exception;
+ return flags;
+}
+
+class async_connection;
+
+template<typename Handler>
+inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler)
+{
+ int flushed = PQflush(conn);
+ if (flushed < 0)
+ {
+ handler(error(conn));
+ return;
+ }
+ if (flushed == 1)
+ {
+ event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout,
+ [event, conn, timeout, handler](int flags) mutable {
+ if (flags&qtl::event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ return;
+ }
+ if (flags&qtl::event::ef_read)
+ {
+ if (!PQconsumeInput(conn))
+ {
+ handler(error(conn));
+ return;
+ }
+ }
+ if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception))
+ async_wait(event, conn, timeout, handler);
+ });
+ }
+ else
+ {
+ event->set_io_handler(qtl::event::ef_read, 10,
+ [event, conn, timeout, handler](int flags) mutable {
+ if (flags&qtl::event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if (flags&(qtl::event::ef_read | qtl::event::ef_exception))
+ {
+ if (PQconsumeInput(conn))
+ {
+ if (!PQisBusy(conn))
+ handler(postgres::error());
+ else
+ async_wait(event, conn, timeout, handler);
+ }
+ else
+ {
+ handler(postgres::error(conn));
+ }
+ }
+ else
+ {
+ handler(postgres::error(conn));
+ }
+ });
+ }
+}
+
+class async_statement : public base_statement
+{
+public:
+ async_statement(async_connection& db);
+ async_statement(async_statement&& src)
+ : base_statement(std::move(src)), m_timeout(2)
+ {
+ m_event = src.m_event;
+ m_timeout = src.m_timeout;
+ src.m_event = nullptr;
+ }
+ async_statement& operator=(async_statement&& src)
+ {
+ if (this != &src)
+ {
+ base_statement::operator =(std::move(src));
+ m_event = src.m_event;
+ m_timeout = src.m_timeout;
+ src.m_event = nullptr;
+ }
+ return *this;
+ }
+ ~async_statement()
+ {
+ close();
+ }
+
+ /*
+ Handler defiens as:
+ void handler(const qtl::mysql::error& e);
+ */
+ template<typename Handler>
+ void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr)
+ {
+ _name.resize(sizeof(intptr_t) * 2 + 1);
+ int n = sprintf(const_cast<char*>(_name.data()), "q%p", this);
+ _name.resize(n);
+ std::transform(_name.begin(), _name.end(), _name.begin(), tolower);
+ if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes))
+ {
+ async_wait([this, handler](error e) mutable {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ if (m_res)
+ {
+ m_res.verify_error<PGRES_COMMAND_OK>(e);
+ while(m_res)
+ m_res = PQgetResult(m_conn);
+ }
+ }
+ handler(e);
+ });
+ }
+ else
+ {
+ _name.clear();
+ handler(error(m_conn));
+ }
+ }
+
+ template<typename Handler, typename... Types>
+ void open(Handler&& handler, const char* command)
+ {
+ auto binder_list = make_binder_list(Types()...);
+ std::array<Oid, sizeof...(Types)> types;
+ std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) {
+ return b.type();
+ });
+
+ open(std::forward<Handler>(handler), command, types.size(), types.data());
+ }
+
+ void close()
+ {
+ while (m_res)
+ {
+ m_res = PQgetResult(m_conn);
+ }
+
+ if (!_name.empty())
+ {
+ std::ostringstream oss;
+ oss << "DEALLOCATE " << _name << ";";
+ result res = PQexec(m_conn, oss.str().data());
+ error e;
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ finish(res);
+ if(e) throw e;
+ }
+ base_statement::close();
+ }
+
+ template<typename Handler>
+ void close(Handler&& handler)
+ {
+ while (m_res)
+ {
+ if(PQisBusy(m_conn))
+ {
+ async_wait([this, handler](const error& e) mutable {
+ close(handler);
+ });
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ }
+ }
+
+ if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK)
+ {
+ std::ostringstream oss;
+ oss << "DEALLOCATE " << _name << ";";
+ bool ok = PQsendQuery(m_conn, oss.str().data());
+ if (ok)
+ {
+ async_wait([this, handler](postgres::error e) mutable {
+ if (PQstatus(m_conn) == CONNECTION_OK)
+ {
+ result res(PQgetResult(m_conn));
+ if (res)
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ if (!e) _name.clear();
+ finish(res);
+ handler(e);
+ }
+ else
+ {
+ _name.clear();
+ handler(error());
+ }
+ });
+ }
+ else
+ {
+ handler(error(m_conn));
+ }
+ }
+ else
+ {
+ _name.clear();
+ }
+ }
+
+ /*
+ ExecuteHandler defiens as:
+ void handler(const qtl::mysql::error& e, uint64_t affected);
+ */
+ template<typename ExecuteHandler>
+ void execute(ExecuteHandler&& handler)
+ {
+ if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) &&
+ PQsetSingleRowMode(m_conn))
+ {
+ async_wait([this, handler](error e) {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+ finish(m_res);
+ }
+ handler(e);
+ });
+ }
+ else
+ {
+ handler(error(m_conn));
+ }
+ }
+
+ template<typename Types, typename Handler>
+ void execute(const Types& params, Handler&& handler)
+ {
+ const size_t count = qtl::params_binder<statement, Types>::size;
+ if (count > 0)
+ {
+ m_binders.resize(count);
+ qtl::bind_params(*this, params);
+
+ std::array<const char*, count> values;
+ std::array<int, count> lengths;
+ std::array<int, count> formats;
+ for (size_t i = 0; i != m_binders.size(); i++)
+ {
+ values[i] = m_binders[i].value();
+ lengths[i] = static_cast<int>(m_binders[i].length());
+ formats[i] = 1;
+ }
+ if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ }
+ else
+ {
+ if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ }
+ if (!PQsetSingleRowMode(m_conn))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ if (PQisBusy(m_conn))
+ {
+ async_wait([this, handler](error e) mutable {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+ int64_t affected = m_res.affected_rows();
+ finish(m_res);
+ handler(e, affected);
+ }
+ else
+ {
+ handler(e, 0);
+ }
+ });
+ }
+ }
+
+ template<typename Types, typename RowHandler, typename FinishHandler>
+ void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
+ {
+ if (m_res)
+ {
+ ExecStatusType status = m_res.status();
+ if (status == PGRES_SINGLE_TUPLE)
+ {
+ int count = m_res.get_column_count();
+ if (count > 0)
+ {
+ m_binders.resize(count);
+ for (int i = 0; i != count; i++)
+ {
+ m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i),
+ m_res.get_column_type(i));
+ }
+ qtl::bind_record(*this, std::forward<Types>(values));
+ }
+ row_handler();
+ if (PQisBusy(m_conn))
+ {
+ async_wait([this, &values, row_handler, finish_handler](const error& e) {
+ if (e)
+ {
+ finish_handler(e);
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ fetch(std::forward<Types>(values), row_handler, finish_handler);
+ }
+ });
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ fetch(std::forward<Types>(values), row_handler, finish_handler);
+ }
+ }
+ else
+ {
+ error e;
+ m_res.verify_error<PGRES_TUPLES_OK>(e);
+ finish_handler(e);
+ }
+ }
+ else
+ {
+ finish_handler(error());
+ }
+ }
+
+ template<typename Handler>
+ void next_result(Handler&& handler)
+ {
+ async_wait([this, handler](const error& e) {
+ if (e)
+ {
+ handler(e);
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ handler(error());
+ }
+ });
+ }
+
+private:
+ event* m_event;
+ int m_timeout;
+ template<typename Handler>
+ void async_wait(Handler&& handler)
+ {
+ qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler));
+ }
+};
+
+class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement>
+{
+public:
+ async_connection() : m_connect_timeout(2), m_query_timeout(2)
+ {
+ }
+ async_connection(async_connection&& src)
+ : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout)
+ {
+ }
+ async_connection& operator=(async_connection&& src)
+ {
+ if (this != &src)
+ {
+ base_database::operator=(std::move(src));
+ m_connect_timeout = src.m_connect_timeout;
+ m_query_timeout = src.m_query_timeout;
+ }
+ return *this;
+ }
+
+ /*
+ OpenHandler defines as:
+ void handler(const qtl::postgres::error& e) NOEXCEPT;
+ */
+ template<typename EventLoop, typename OpenHandler>
+ void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false)
+ {
+ std::vector<const char*> keywords;
+ std::vector<const char*> values;
+ keywords.reserve(params.size());
+ values.reserve(params.size());
+ for (auto& param : params)
+ {
+ keywords.push_back(param.first.data());
+ values.push_back(param.second.data());
+ }
+ keywords.push_back(nullptr);
+ values.push_back(nullptr);
+ m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname);
+ if (m_conn == nullptr)
+ throw std::bad_alloc();
+ if (status() == CONNECTION_BAD)
+ {
+ handler(error(m_conn));
+ return;
+ }
+
+ if (PQsetnonblocking(m_conn, true)!=0)
+ handler(error(m_conn));
+ get_options();
+ bind(ev);
+ wait_connect(std::forward<OpenHandler>(handler));
+ }
+
+ template<typename EventLoop, typename OpenHandler>
+ void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo)
+ {
+ m_conn = PQconnectStart(conninfo);
+ if (m_conn == nullptr)
+ throw std::bad_alloc();
+ if (status() == CONNECTION_BAD)
+ {
+ handler(error(m_conn));
+ return;
+ }
+
+ PQsetnonblocking(m_conn, true);
+ get_options();
+ bind(ev);
+ wait_connect(std::forward<OpenHandler>(handler));
+ }
+
+ template<typename OpenHandler>
+ void reset(OpenHandler&& handler)
+ {
+ PQresetStart(m_conn);
+ wait_reset(std::forward<OpenHandler>(handler));
+ }
+
+ /*
+ Handler defines as:
+ void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
+ */
+ template<typename ExecuteHandler>
+ void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
+ {
+ bool ok = PQsendQuery(m_conn, query_text);
+ if (ok)
+ {
+ async_wait([this, handler](postgres::error e) mutable {
+ result res(PQgetResult(m_conn));
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ uint64_t affected = res.affected_rows();
+ handler(e, affected);
+ while (res)
+ res = PQgetResult(m_conn);
+ });
+ }
+ else
+ {
+ handler(error(m_conn), 0);
+ }
+ }
+
+ template<typename Handler>
+ void auto_commit(Handler&& handler, bool on) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler),
+ on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF");
+ }
+
+ template<typename Handler>
+ void begin_transaction(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "BEGIN");
+ }
+
+ template<typename Handler>
+ void rollback(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "ROLLBACK");
+ }
+
+ template<typename Handler>
+ void commit(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "COMMIT");
+ }
+
+ /*
+ ResultHandler defines as:
+ void result_handler(const qtl::postgres::error& e) NOEXCEPT;
+ */
+ template<typename RowHandler, typename ResultHandler>
+ void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
+ {
+ bool ok = PQsendQuery(m_conn, query);
+ if (ok)
+ {
+ async_wait([this, row_handler, result_handler](postgres::error e) mutable {
+ result res(PQgetResult(m_conn));
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ if (e)
+ {
+ result_handler(e, 0);
+ return;
+ }
+ uint64_t affected = res.affected_rows();
+ while (res && res.status() == PGRES_TUPLES_OK)
+ {
+ simple_statment stmt(*this, std::move(res));
+ stmt.fetch_all(row_handler);
+ res = PQgetResult(m_conn);
+ }
+ result_handler(e, affected);
+ });
+ }
+ else
+ {
+ result_handler(error(m_conn), 0);
+ }
+ }
+
+ template<typename Handler>
+ void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler)
+ {
+ std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this);
+ stmt->open([stmt, handler](const postgres::error& e) mutable {
+ handler(e, stmt);
+ }, query_text, 0);
+ }
+
+ template<typename Handler>
+ bool is_alive(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "");
+ }
+
+ socket_type socket() const NOEXCEPT { return PQsocket(m_conn); }
+
+ int connect_timeout() const { return m_connect_timeout; }
+ void connect_timeout(int timeout) { m_connect_timeout = timeout; }
+ int query_timeout() const { return m_query_timeout; }
+ void query_timeout(int timeout) { m_query_timeout = timeout; }
+
+private:
+ int m_connect_timeout;
+ int m_query_timeout;
+
+ void get_options()
+ {
+ PQconninfoOption* options = PQconninfo(m_conn);
+ m_connect_timeout = 2;
+ for (PQconninfoOption* option = options; option; option++)
+ {
+ if (strcmp(option->keyword, "connect_timeout") == 0)
+ {
+ if (option->val)
+ m_connect_timeout = atoi(option->val);
+ break;
+ }
+ }
+ PQconninfoFree(options);
+ }
+
+ template<typename OpenHandler>
+ void wait_connect(OpenHandler&& handler) NOEXCEPT
+ {
+ PostgresPollingStatusType status = PQconnectPoll(m_conn);
+ switch (status)
+ {
+ case PGRES_POLLING_READING:
+ case PGRES_POLLING_WRITING:
+ m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+ [this, handler](int flags) mutable {
+ if (flags&event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if(flags&(event::ef_read|event::ef_write | event::ef_exception))
+ wait_connect(std::forward<OpenHandler>(handler));
+ });
+ break;
+ case PGRES_POLLING_FAILED:
+ handler(postgres::error(handle()));
+ break;
+ case PGRES_POLLING_OK:
+ //PQsetnonblocking(m_conn, true);
+ handler(postgres::error());
+ }
+ }
+
+ template<typename OpenHandler>
+ void wait_reset(OpenHandler&& handler) NOEXCEPT
+ {
+ PostgresPollingStatusType status = PQresetPoll(m_conn);
+ switch (status)
+ {
+ case PGRES_POLLING_READING:
+ case PGRES_POLLING_WRITING:
+ m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+ [this, handler](int flags) mutable {
+ if (flags&event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if (flags&(event::ef_read | event::ef_write | event::ef_exception))
+ wait_reset(std::forward<OpenHandler>(handler));
+ });
+ break;
+ case PGRES_POLLING_FAILED:
+ handler(postgres::error(m_conn));
+ break;
+ case PGRES_POLLING_OK:
+ handler(postgres::error());
+ }
+ }
+
+ template<typename Handler>
+ void async_wait(Handler&& handler)
+ {
+ qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler));
+ }
+
+};
+
+inline async_statement::async_statement(async_connection& db)
+ : base_statement(static_cast<base_database&>(db))
+{
+ m_event = db.event();
+ m_timeout = db.query_timeout();
+}
+
+
typedef qtl::transaction<database> transaction;
template<typename Record>
diff --git a/include/qtl_postgres_pool.hpp b/include/qtl_postgres_pool.hpp
index 5f8b59c..815ee3c 100644
--- a/include/qtl_postgres_pool.hpp
+++ b/include/qtl_postgres_pool.hpp
@@ -38,6 +38,32 @@
std::string m_password;
};
+template<typename EventLoop>
+class async_pool : public qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection>
+{
+ typedef qtl::async_pool<async_pool<EventLoop>, EventLoop, async_connection> base_class;
+public:
+ async_pool(EventLoop& ev) : base_class(ev) { }
+ virtual ~async_pool() { }
+
+ template<typename Handler>
+ void new_connection(EventLoop& ev, Handler&& handler) throw()
+ {
+ async_connection* db = new async_connection;
+ db->open(ev, [this, handler, db](const postgres::error& e) mutable {
+ if (e)
+ {
+ delete db;
+ db = nullptr;
+ }
+ handler(e, db);
+ }, m_params);
+ }
+
+protected:
+ std::map<std::string, std::string> m_params;
+};
+
}
}
diff --git a/test/AsyncPostgres.cpp b/test/AsyncPostgres.cpp
new file mode 100644
index 0000000..67ab021
--- /dev/null
+++ b/test/AsyncPostgres.cpp
@@ -0,0 +1,150 @@
+#include "stdafx.h"
+#include "../include/qtl_postgres.hpp"
+#include <vector>
+#include <thread>
+#include <system_error>
+#include <time.h>
+#include <limits.h>
+#include "../include/qtl_postgres_pool.hpp"
+#include "../include/qtl_asio.hpp"
+
+using namespace qtl::postgres;
+
+void LogError(const error& e)
+{
+ fprintf(stderr, "PostgreSQL Error: %s\n", e.what());
+}
+
+const char postgres_server[] = "localhost";
+const char postgres_user[] = "postgres";
+const char postgres_password[] = "111111";
+const char postgres_database[] = "test";
+
+qtl::asio::service service;
+
+void SimpleTest()
+{
+ async_connection connection;
+ service.reset();
+ std::map<std::string, std::string> params;
+ params["host"] = postgres_server;
+ params["dbname"] = postgres_database;
+ params["user"] = postgres_user;
+ params["password"] = postgres_password;
+ connection.open(service, [&connection](const error& e) {
+ if (e)
+ {
+ LogError(e);
+ service.stop();
+ }
+ else
+ {
+ printf("Connect to PostgreSQL ok.\n");
+ connection.simple_query("select id, name from test", [](const std::string& id, const std::string& name) {
+ printf("%s\t%s\n", id.data(), name.data());
+ return true;
+ }, [&connection](const error& e, size_t row_count) {
+ if (e)
+ LogError(e);
+ else
+ printf("Total %lu rows.\n", row_count);
+
+ connection.close();
+ });
+ }
+ }, params);
+
+ service.run();
+}
+
+void insert(async_connection& connection, int next)
+{
+ qtl::asio::async_execute(connection, [&connection, next](const error& e, uint64_t affected) mutable {
+ if (e)
+ {
+ LogError(e);
+ service.stop();
+ }
+ else
+ {
+ if (next)
+ {
+ --next;
+ asio::post(service.context(), [&connection, next]() {
+ insert(connection, next);
+ });
+ }
+ else
+ {
+ service.stop();
+ }
+ }
+ }, "insert into test(name, createtime) values($1, LOCALTIMESTAMP)", 0, "test name");
+}
+
+void ExecuteTest()
+{
+ async_connection connection;
+ service.reset();
+ std::map<std::string, std::string> params;
+ params["host"] = postgres_server;
+ params["dbname"] = postgres_database;
+ params["user"] = postgres_user;
+ params["password"] = postgres_password;
+ qtl::asio::async_open(service, connection, [&connection](const error& e) {
+ if (e)
+ {
+ LogError(e);
+ service.stop();
+ }
+ else
+ {
+ printf("Connect to PostgreSQL ok.\n");
+ insert(connection, 10);
+ }
+ }, params);
+
+ service.run();
+};
+
+void QueryTest()
+{
+ async_connection connection;
+ service.reset();
+ std::map<std::string, std::string> params;
+ params["host"] = postgres_server;
+ params["dbname"] = postgres_database;
+ params["user"] = postgres_user;
+ params["password"] = postgres_password;
+ connection.open(service, [&connection](const error& e) {
+ if (e)
+ {
+ LogError(e);
+ service.stop();
+ }
+ else
+ {
+ printf("Connect to PostgreSQL ok.\n");
+ connection.query("select id, name, CreateTime from test",
+ [](int32_t id, const std::string& name, const qtl::postgres::timestamp& create_time) {
+ printf("%d\t%s\t%s\n", id, name.data(), create_time.to_string().data());
+ }, [&connection](const error& e) {
+ printf("query has completed.\n");
+ if (e)
+ LogError(e);
+
+ connection.close();
+ });
+ }
+ }, params);
+
+ service.run();
+}
+
+int main(int argc, char* argv[])
+{
+ ExecuteTest();
+ SimpleTest();
+ QueryTest();
+ return 0;
+}
diff --git a/test/TestPostgres.cpp b/test/TestPostgres.cpp
index eeab3f2..60ee70a 100644
--- a/test/TestPostgres.cpp
+++ b/test/TestPostgres.cpp
@@ -73,6 +73,10 @@
{
ASSERT_EXCEPTION(e);
}
+ catch (std::exception& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_select()
@@ -83,7 +87,7 @@
try
{
db.query("select * from test where id=$1", 0, id,
- [](const qtl::indicator<int32_t>& id, const std::string& name, const qtl::postgres::timestamp& create_time) {
+ [](const qtl::indicator<int32_t>& id, const std::string& name, const qtl::postgres::timestamp& create_time, const std::vector<int32_t>& v, const std::tuple<int, int>& pt) {
printf("ID=\"%d\", Name=\"%s\"\n",
id.data, name.data());
});
@@ -101,6 +105,10 @@
{
ASSERT_EXCEPTION(e);
}
+ catch (std::exception& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_insert()
@@ -110,10 +118,15 @@
try
{
- db.query_first("insert into test(Name, CreateTime) values($1, LOCALTIMESTAMP) returning ID",
- "test_user", id);
+ int32_t va[] = { 200, 300, 400 };
+ db.query_first("insert into test(Name, CreateTime, va, percent) values($1, LOCALTIMESTAMP, $2, $3) returning ID",
+ std::forward_as_tuple("test_user", va, std::make_pair(11, 22)), id);
}
catch (qtl::postgres::error& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
+ catch (std::exception& e)
{
ASSERT_EXCEPTION(e);
}
@@ -136,6 +149,10 @@
{
ASSERT_EXCEPTION(e);
}
+ catch (std::exception& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_update()
@@ -152,7 +169,10 @@
{
ASSERT_EXCEPTION(e);
}
- TEST_ASSERT_MSG(id > 0, "insert failture.");
+ catch (std::exception& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_clear()
@@ -165,6 +185,10 @@
db.simple_execute("delete from test");
}
catch (qtl::postgres::error& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
+ catch (std::exception& e)
{
ASSERT_EXCEPTION(e);
}
@@ -185,6 +209,10 @@
}
}
catch (qtl::postgres::error& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
+ catch (std::exception& e)
{
ASSERT_EXCEPTION(e);
}
@@ -233,6 +261,10 @@
{
ASSERT_EXCEPTION(e);
}
+ catch (std::bad_cast& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_select_blob()
@@ -271,6 +303,10 @@
{
ASSERT_EXCEPTION(e);
}
+ catch (std::bad_cast& e)
+ {
+ ASSERT_EXCEPTION(e);
+ }
}
void TestPostgres::test_any()
diff --git a/test/test_postgres.mak b/test/test_postgres.mak
index 337a635..e6f6e7a 100644
--- a/test/test_postgres.mak
+++ b/test/test_postgres.mak
@@ -1,24 +1,30 @@
-TARGET=test_postgres
+TARGET=test_postgres async_postgres
CC=g++
PCH_HEADER=stdafx.h
PCH=stdafx.h.gch
-OBJ=TestPostgres.o md5.o
-CFLAGS=-g -D_DEBUG -O2-I/usr/include -I/usr/local/include -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server )
+OBJ=TestPostgres.o AsyncPostgres.o md5.o
+CFLAGS=-g -D_DEBUG -O2 -I/usr/include -I/usr/local/include -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server )
CXXFLAGS= -I../include -std=c++11
-LDFLAGS= -L$(shell pg_config --libdir) -lcpptest -lpq -lpgtypes
+LDFLAGS= -L$(shell pg_config --libdir) -pthread -lcpptest -lpq -lpgtypes
all : $(TARGET)
$(PCH) : $(PCH_HEADER)
$(CC) $(CFLAGS) $(CXXFLAGS) -x c++-header -o $@ $<
-TestPostgres.o : TestPostgres.cpp $(PCH)
+TestPostgres.o : TestPostgres.cpp TestPostgres.h $(PCH)
+ $(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ $<
+
+AsyncPostgres.o : AsyncPostgres.cpp $(PCH)
$(CC) -c $(CFLAGS) $(CXXFLAGS) -o $@ $<
md5.o : md5.c md5.h
gcc -c $(CFLAGS) -o $@ $<
-$(TARGET) : $(OBJ)
+test_postgres : TestPostgres.o md5.o
+ libtool --tag=CXX --mode=link $(CC) $(LDFLAGS) -o $@ $^
+
+async_postgres : AsyncPostgres.o md5.o
libtool --tag=CXX --mode=link $(CC) $(LDFLAGS) -o $@ $^
clean:
--
Gitblit v1.9.3