From 9b81cdebba95f1b6e687191c630c1f8f6cf0df16 Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Thu, 25 Mar 2021 12:44:30 +0000
Subject: [PATCH] PostgreSQL: Support asynchronous operation Fix bug in different versions of asio

---
 include/qtl_postgres.hpp |  753 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 719 insertions(+), 34 deletions(-)

diff --git a/include/qtl_postgres.hpp b/include/qtl_postgres.hpp
index 4bc2148..3418c22 100644
--- a/include/qtl_postgres.hpp
+++ b/include/qtl_postgres.hpp
@@ -13,6 +13,7 @@
 #include <algorithm>
 #include <assert.h>
 #include "qtl_common.hpp"
+#include "qtl_async.hpp"
 
 #define FRONTEND
 
@@ -26,7 +27,6 @@
 
 extern "C"
 {
-#include <c.h>
 #include <postgres.h>
 #include <catalog/pg_type.h>
 }
@@ -164,11 +164,11 @@
 class error : public std::exception
 {
 public:
-	error() : m_errmsg(nullptr) { }
+	error() : m_errmsg() { }
 	explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS)
 	{
-		PQsetErrorVerbosity(conn, verbosity);
-		PQsetErrorContextVisibility(conn, show_context);
+		//PQsetErrorVerbosity(conn, verbosity);
+		//PQsetErrorContextVisibility(conn, show_context);
 		const char* errmsg = PQerrorMessage(conn);
 		if (errmsg) m_errmsg = errmsg;
 		else m_errmsg.clear();
@@ -182,9 +182,19 @@
 	}
 
 	virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); }
+	operator bool() const { return !m_errmsg.empty(); }
 
-private:
+protected:
 	std::string m_errmsg;
+};
+
+class timeout : public error
+{
+public:
+	timeout()
+	{
+		m_errmsg = "timeout";
+	}
 };
 
 inline void verify_pgtypes_error(int ret)
@@ -992,7 +1002,7 @@
 			if (end - data < size)
 				throw std::overflow_error("insufficient data left in message");
 			data = object_traits<T>::get(value, data, data + size);
-			if (data >= end)
+			if (data > end)
 				throw std::overflow_error("insufficient data left in message");
 			result.push_back(value);
 		}
@@ -1494,6 +1504,17 @@
 		}
 	}
 
+	template<ExecStatusType... Excepted>
+	void verify_error(error& e)
+	{
+		if (m_res)
+		{
+			ExecStatusType got = status();
+			if (!in<ExecStatusType, Excepted...>(got))
+				e = error(m_res);
+		}
+	}
+
 	void clear()
 	{
 		if (m_res)
@@ -1517,15 +1538,27 @@
 	 }
 	 base_statement(const base_statement&) = delete;
 	 base_statement(base_statement&& src) 
-		 : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res))
+		 : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name))
 	 {
+	 }
+	 base_statement& operator=(const base_statement&) = delete;
+	 base_statement& operator=(base_statement&& src)
+	 {
+		 if (this != &src)
+		 {
+			 close();
+			 m_conn = src.m_conn;
+			 m_binders = std::move(src.m_binders);
+			 m_res = std::move(src.m_res);
+		 }
+		 return *this;
 	 }
 
 	result& get_result() { return m_res; }
 
 	void close()
 	{
-		m_res=nullptr;
+		m_res = nullptr;
 	}
 
 	uint64_t affetced_rows() const
@@ -1614,6 +1647,7 @@
 protected:
 	PGconn* m_conn;
 	result m_res;
+	std::string _name;
 	std::vector<binder> m_binders;
 
 	template<ExecStatusType... Excepted>
@@ -1624,6 +1658,13 @@
 		else
 			throw error(m_conn);
 	}
+	void finish(result& res)
+	{
+		while (res)
+		{
+			res = PQgetResult(m_conn);
+		}
+	}
 };
 
 class statement : public base_statement
@@ -1633,7 +1674,7 @@
 	{
 	}
 	statement(const statement&) = delete;
-	statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name))
+	statement(statement&& src) : base_statement(std::move(src))
 	{
 	}
 
@@ -1761,17 +1802,6 @@
 		finish(m_res);
 		m_res.clear();
 	}
-
-private:
-	std::string _name;
-
-	void finish(result& res)
-	{
-		while (res)
-		{
-			res = PQgetResult(m_conn);
-		}
-	}
 };
 
 class base_database
@@ -1783,6 +1813,8 @@
 	}
 
 public:
+	typedef postgres::error exception_type;
+
 	base_database(const base_database&) = delete;
 	base_database(base_database&& src)
 	{
@@ -1796,8 +1828,8 @@
 			PQfinish(m_conn);
 	}
 
-	base_database& operator==(const base_database&) = delete;
-	base_database& operator==(base_database&& src)
+	base_database& operator=(const base_database&) = delete;
+	base_database& operator=(base_database&& src)
 	{
 		if (this != &src)
 		{
@@ -1882,12 +1914,10 @@
 			PQreset(m_conn);
 	}
 
-	bool is_alive()
+	void close()
 	{
-	}
-
-	PGPing ping()
-	{
+		PQfinish(m_conn);
+		m_conn = nullptr;
 	}
 
 protected:
@@ -1920,7 +1950,7 @@
 						m_res.get_column_type(j));
 				}
 				qtl::bind_record(*this, std::forward<decltype(values)>(values));
-				proc(values);
+				qtl::detail::apply(proc, std::forward<decltype(values)>(values));
 			}
 		}
 	}
@@ -1959,12 +1989,6 @@
 		sprintf(port_text, "%u", port);
 		m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password);
 		return m_conn != nullptr && status() == CONNECTION_OK;
-	}
-
-	void close()
-	{
-		PQfinish(m_conn);
-		m_conn = nullptr;
 	}
 
 	statement open_command(const char* query_text, size_t /*text_length*/)
@@ -2023,8 +2047,669 @@
 		simple_execute("COMMIT");
 	}
 
+	bool is_alive()
+	{
+		qtl::postgres::result res(PQexec(m_conn, ""));
+		return res && res.status() == PGRES_COMMAND_OK;
+	}
+
 };
 
+inline int event_flags(PostgresPollingStatusType status)
+{
+	int flags = 0;
+	if (status == PGRES_POLLING_READING)
+		flags |= event::ef_read;
+	else if (status == PGRES_POLLING_WRITING)
+		flags |= event::ef_write;
+	else if (status == PGRES_POLLING_FAILED)
+		flags |= event::ef_exception;
+	return flags;
+}
+
+class async_connection;
+
+template<typename Handler>
+inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler)
+{
+	int flushed = PQflush(conn);
+	if (flushed < 0)
+	{
+		handler(error(conn));
+		return;
+	}
+	if (flushed == 1)
+	{
+		event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout,
+			[event, conn, timeout, handler](int flags) mutable {
+			if (flags&qtl::event::ef_timeout)
+			{
+				handler(postgres::timeout());
+				return;
+			}
+			if (flags&qtl::event::ef_read)
+			{
+				if (!PQconsumeInput(conn))
+				{
+					handler(error(conn));
+					return;
+				}
+			}
+			if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception))
+				async_wait(event, conn, timeout, handler);
+		});
+	}
+	else
+	{
+		event->set_io_handler(qtl::event::ef_read, 10,
+			[event, conn, timeout, handler](int flags) mutable {
+			if (flags&qtl::event::ef_timeout)
+			{
+				handler(postgres::timeout());
+			}
+			else if (flags&(qtl::event::ef_read | qtl::event::ef_exception))
+			{
+				if (PQconsumeInput(conn))
+				{
+					if (!PQisBusy(conn))
+						handler(postgres::error());
+					else
+						async_wait(event, conn, timeout, handler);
+				}
+				else
+				{
+					handler(postgres::error(conn));
+				}
+			}
+			else
+			{
+				handler(postgres::error(conn));
+			}
+		});
+	}
+}
+
+class async_statement : public base_statement
+{
+public:
+	async_statement(async_connection& db);
+	async_statement(async_statement&& src)
+		: base_statement(std::move(src)), m_timeout(2)
+	{
+		m_event = src.m_event;
+		m_timeout = src.m_timeout;
+		src.m_event = nullptr;
+	}
+	async_statement& operator=(async_statement&& src)
+	{
+		if (this != &src)
+		{
+			base_statement::operator =(std::move(src));
+			m_event = src.m_event;
+			m_timeout = src.m_timeout;
+			src.m_event = nullptr;
+		}
+		return *this;
+	}
+	~async_statement()
+	{
+		close();
+	}
+
+	/*
+		Handler defiens as:
+		void handler(const qtl::mysql::error& e);
+	 */
+	template<typename Handler>
+	void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr)
+	{
+		_name.resize(sizeof(intptr_t) * 2 + 1);
+		int n = sprintf(const_cast<char*>(_name.data()), "q%p", this);
+		_name.resize(n);
+		std::transform(_name.begin(), _name.end(), _name.begin(), tolower);
+		if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes))
+		{
+			async_wait([this, handler](error e) mutable {
+				if (!e)
+				{
+					m_res = PQgetResult(m_conn);
+					if (m_res)
+					{
+						m_res.verify_error<PGRES_COMMAND_OK>(e);
+						while(m_res)
+							m_res = PQgetResult(m_conn);
+					}
+				}
+				handler(e);
+			});
+		}
+		else
+		{
+			_name.clear();
+			handler(error(m_conn));
+		}
+	}
+
+	template<typename Handler, typename... Types>
+	void open(Handler&& handler, const char* command)
+	{
+		auto binder_list = make_binder_list(Types()...);
+		std::array<Oid, sizeof...(Types)> types;
+		std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) {
+			return b.type();
+		});
+
+		open(std::forward<Handler>(handler), command, types.size(), types.data());
+	}
+
+	void close()
+	{
+		while (m_res)
+		{
+			m_res = PQgetResult(m_conn);
+		}
+
+		if (!_name.empty())
+		{
+			std::ostringstream oss;
+			oss << "DEALLOCATE " << _name << ";";
+			result res = PQexec(m_conn, oss.str().data());
+			error e;
+			res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+			finish(res);
+			if(e) throw e;
+		}
+		base_statement::close();
+	}
+
+	template<typename Handler>
+	void close(Handler&& handler)
+	{
+		while (m_res)
+		{
+			if(PQisBusy(m_conn))
+			{
+				async_wait([this, handler](const error& e) mutable {
+					close(handler);
+				});
+			}
+			else
+			{
+				m_res = PQgetResult(m_conn);
+			}
+		}
+
+		if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK)
+		{
+			std::ostringstream oss;
+			oss << "DEALLOCATE " << _name << ";";
+			bool ok = PQsendQuery(m_conn, oss.str().data());
+			if (ok)
+			{
+				async_wait([this, handler](postgres::error e) mutable {
+					if (PQstatus(m_conn) == CONNECTION_OK)
+					{
+						result res(PQgetResult(m_conn));
+						if (res)
+							res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+						if (!e) _name.clear();
+						finish(res);
+						handler(e);
+					}
+					else
+					{
+						_name.clear();
+						handler(error());
+					}
+				});
+			}
+			else
+			{
+				handler(error(m_conn));
+			}
+		}
+		else
+		{
+			_name.clear();
+		}
+	}
+
+	/*
+		ExecuteHandler defiens as:
+		void handler(const qtl::mysql::error& e, uint64_t affected);
+	 */
+	template<typename ExecuteHandler>
+	void execute(ExecuteHandler&& handler)
+	{
+		if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) &&
+			PQsetSingleRowMode(m_conn))
+		{
+			async_wait([this, handler](error e) {
+				if (!e)
+				{
+					m_res = PQgetResult(m_conn);
+					m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+					finish(m_res);
+				}
+				handler(e);
+			});
+		}
+		else
+		{
+			handler(error(m_conn));
+		}
+	}
+
+	template<typename Types, typename Handler>
+	void execute(const Types& params, Handler&& handler)
+	{
+		const size_t count = qtl::params_binder<statement, Types>::size;
+		if (count > 0)
+		{
+			m_binders.resize(count);
+			qtl::bind_params(*this, params);
+
+			std::array<const char*, count> values;
+			std::array<int, count> lengths;
+			std::array<int, count> formats;
+			for (size_t i = 0; i != m_binders.size(); i++)
+			{
+				values[i] = m_binders[i].value();
+				lengths[i] = static_cast<int>(m_binders[i].length());
+				formats[i] = 1;
+			}
+			if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1))
+			{
+				handler(error(m_conn), 0);
+				return;
+			}
+		}
+		else
+		{
+			if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1))
+			{
+				handler(error(m_conn), 0);
+				return;
+			}
+		}
+		if (!PQsetSingleRowMode(m_conn))
+		{
+			handler(error(m_conn), 0);
+			return;
+		}
+		if (PQisBusy(m_conn))
+		{
+			async_wait([this, handler](error e) mutable {
+				if (!e)
+				{
+					m_res = PQgetResult(m_conn);
+					m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+					int64_t affected = m_res.affected_rows();
+					finish(m_res);
+					handler(e, affected);
+				}
+				else
+				{
+					handler(e, 0);
+				}
+			});
+		}
+	}
+
+	template<typename Types, typename RowHandler, typename FinishHandler>
+	void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
+	{
+		if (m_res)
+		{
+			ExecStatusType status = m_res.status();
+			if (status == PGRES_SINGLE_TUPLE)
+			{
+				int count = m_res.get_column_count();
+				if (count > 0)
+				{
+					m_binders.resize(count);
+					for (int i = 0; i != count; i++)
+					{
+						m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i),
+							m_res.get_column_type(i));
+					}
+					qtl::bind_record(*this, std::forward<Types>(values));
+				}
+				row_handler();
+				if (PQisBusy(m_conn))
+				{
+					async_wait([this, &values, row_handler, finish_handler](const error& e) {
+						if (e)
+						{
+							finish_handler(e);
+						}
+						else
+						{
+							m_res = PQgetResult(m_conn);
+							fetch(std::forward<Types>(values), row_handler, finish_handler);
+						}
+					});
+				}
+				else
+				{
+					m_res = PQgetResult(m_conn);
+					fetch(std::forward<Types>(values), row_handler, finish_handler);
+				}
+			}
+			else
+			{
+				error e;
+				m_res.verify_error<PGRES_TUPLES_OK>(e);
+				finish_handler(e);
+			}
+		}
+		else
+		{
+			finish_handler(error());
+		}
+	}
+
+	template<typename Handler>
+	void next_result(Handler&& handler)
+	{
+		async_wait([this, handler](const error& e) {
+			if (e)
+			{
+				handler(e);
+			}
+			else
+			{
+				m_res = PQgetResult(m_conn);
+				handler(error());
+			}
+		});
+	}
+
+private:
+	event* m_event;
+	int m_timeout;
+	template<typename Handler>
+	void async_wait(Handler&& handler)
+	{
+		qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler));
+	}
+};
+
+class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement>
+{
+public:
+	async_connection() : m_connect_timeout(2), m_query_timeout(2)
+	{
+	}
+	async_connection(async_connection&& src)
+		: base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout)
+	{
+	}
+	async_connection& operator=(async_connection&& src)
+	{
+		if (this != &src)
+		{
+			base_database::operator=(std::move(src));
+			m_connect_timeout = src.m_connect_timeout;
+			m_query_timeout = src.m_query_timeout;
+		}
+		return *this;
+	}
+
+	/*
+		OpenHandler defines as:
+			void handler(const qtl::postgres::error& e) NOEXCEPT;
+	*/
+	template<typename EventLoop, typename OpenHandler>
+	void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false)
+	{
+		std::vector<const char*> keywords;
+		std::vector<const char*> values;
+		keywords.reserve(params.size());
+		values.reserve(params.size());
+		for (auto& param : params)
+		{
+			keywords.push_back(param.first.data());
+			values.push_back(param.second.data());
+		}
+		keywords.push_back(nullptr);
+		values.push_back(nullptr);
+		m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname);
+		if (m_conn == nullptr)
+			throw std::bad_alloc();
+		if (status() == CONNECTION_BAD)
+		{
+			handler(error(m_conn));
+			return;
+		}
+
+		if (PQsetnonblocking(m_conn, true)!=0)
+			handler(error(m_conn));
+		get_options();
+		bind(ev);
+		wait_connect(std::forward<OpenHandler>(handler));
+	}
+
+	template<typename EventLoop, typename OpenHandler>
+	void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo)
+	{
+		m_conn = PQconnectStart(conninfo);
+		if (m_conn == nullptr)
+			throw std::bad_alloc();
+		if (status() == CONNECTION_BAD)
+		{
+			handler(error(m_conn));
+			return;
+		}
+
+		PQsetnonblocking(m_conn, true);
+		get_options();
+		bind(ev);
+		wait_connect(std::forward<OpenHandler>(handler));
+	}
+
+	template<typename OpenHandler>
+	void reset(OpenHandler&& handler)
+	{
+		PQresetStart(m_conn);
+		wait_reset(std::forward<OpenHandler>(handler));
+	}
+
+	/*
+		Handler defines as:
+			void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
+	*/
+	template<typename ExecuteHandler>
+	void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
+	{
+		bool ok = PQsendQuery(m_conn, query_text);
+		if (ok)
+		{
+			async_wait([this, handler](postgres::error e) mutable {
+				result res(PQgetResult(m_conn));
+				res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+				uint64_t affected = res.affected_rows();
+				handler(e, affected);
+				while (res)
+					res = PQgetResult(m_conn);
+			});
+		}
+		else
+		{
+			handler(error(m_conn), 0);
+		}
+	}
+
+	template<typename Handler>
+	void auto_commit(Handler&& handler, bool on) NOEXCEPT
+	{
+		simple_execute(std::forward<Handler>(handler),
+			on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF");
+	}
+
+	template<typename Handler>
+	void begin_transaction(Handler&& handler) NOEXCEPT
+	{
+		simple_execute(std::forward<Handler>(handler), "BEGIN");
+	}
+
+	template<typename Handler>
+	void rollback(Handler&& handler) NOEXCEPT
+	{
+		simple_execute(std::forward<Handler>(handler), "ROLLBACK");
+	}
+
+	template<typename Handler>
+	void commit(Handler&& handler) NOEXCEPT
+	{
+		simple_execute(std::forward<Handler>(handler), "COMMIT");
+	}
+
+	/*
+		ResultHandler defines as:
+			void result_handler(const qtl::postgres::error& e) NOEXCEPT;
+	*/
+	template<typename RowHandler, typename ResultHandler>
+	void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
+	{
+		bool ok = PQsendQuery(m_conn, query);
+		if (ok)
+		{
+			async_wait([this, row_handler, result_handler](postgres::error e) mutable {
+				result res(PQgetResult(m_conn));
+				res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+				if (e)
+				{
+					result_handler(e, 0);
+					return;
+				}
+				uint64_t affected = res.affected_rows();
+				while (res && res.status() == PGRES_TUPLES_OK)
+				{
+					simple_statment stmt(*this, std::move(res));
+					stmt.fetch_all(row_handler);
+					res = PQgetResult(m_conn);
+				}
+				result_handler(e, affected);
+			});
+		}
+		else
+		{
+			result_handler(error(m_conn), 0);
+		}
+	}
+
+	template<typename Handler>
+	void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler)
+	{
+		std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this);
+		stmt->open([stmt, handler](const postgres::error& e) mutable {
+			handler(e, stmt);
+		}, query_text, 0);
+	}
+
+	template<typename Handler>
+	bool is_alive(Handler&& handler) NOEXCEPT
+	{
+		simple_execute(std::forward<Handler>(handler), "");
+	}
+
+	socket_type socket() const NOEXCEPT { return PQsocket(m_conn); }
+
+	int connect_timeout() const { return m_connect_timeout; }
+	void connect_timeout(int timeout) { m_connect_timeout = timeout; }
+	int query_timeout() const { return m_query_timeout; }
+	void query_timeout(int timeout) { m_query_timeout = timeout; }
+
+private:
+	int m_connect_timeout;
+	int m_query_timeout;
+
+	void get_options()
+	{
+		PQconninfoOption* options = PQconninfo(m_conn);
+		m_connect_timeout = 2;
+		for (PQconninfoOption* option = options; option; option++)
+		{
+			if (strcmp(option->keyword, "connect_timeout") == 0)
+			{
+				if (option->val)
+					m_connect_timeout = atoi(option->val);
+				break;
+			}
+		}
+		PQconninfoFree(options);
+	}
+
+	template<typename OpenHandler>
+	void wait_connect(OpenHandler&& handler) NOEXCEPT
+	{
+		PostgresPollingStatusType status = PQconnectPoll(m_conn);
+		switch (status)
+		{
+		case PGRES_POLLING_READING:
+		case PGRES_POLLING_WRITING:
+			m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+				[this, handler](int flags) mutable {
+				if (flags&event::ef_timeout)
+				{
+					handler(postgres::timeout());
+				}
+				else if(flags&(event::ef_read|event::ef_write | event::ef_exception))
+					wait_connect(std::forward<OpenHandler>(handler));
+			});
+			break;
+		case PGRES_POLLING_FAILED:
+			handler(postgres::error(handle()));
+			break;
+		case PGRES_POLLING_OK:
+			//PQsetnonblocking(m_conn, true);
+			handler(postgres::error());
+		}
+	}
+
+	template<typename OpenHandler>
+	void wait_reset(OpenHandler&& handler) NOEXCEPT
+	{
+		PostgresPollingStatusType status = PQresetPoll(m_conn);
+		switch (status)
+		{
+		case PGRES_POLLING_READING:
+		case PGRES_POLLING_WRITING:
+			m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+				[this, handler](int flags) mutable {
+				if (flags&event::ef_timeout)
+				{
+					handler(postgres::timeout());
+				}
+				else if (flags&(event::ef_read | event::ef_write | event::ef_exception))
+					wait_reset(std::forward<OpenHandler>(handler));
+			});
+			break;
+		case PGRES_POLLING_FAILED:
+			handler(postgres::error(m_conn));
+			break;
+		case PGRES_POLLING_OK:
+			handler(postgres::error());
+		}
+	}
+
+	template<typename Handler>
+	void async_wait(Handler&& handler)
+	{
+		qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler));
+	}
+
+};
+
+inline async_statement::async_statement(async_connection& db)
+	: base_statement(static_cast<base_database&>(db))
+{
+	m_event = db.event();
+	m_timeout = db.query_timeout();
+}
+
+
 typedef qtl::transaction<database> transaction;
 
 template<typename Record>

--
Gitblit v1.9.3