From 9b81cdebba95f1b6e687191c630c1f8f6cf0df16 Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Thu, 25 Mar 2021 12:44:30 +0000
Subject: [PATCH] PostgreSQL: Support asynchronous operation Fix bug in different versions of asio
---
include/qtl_postgres.hpp | 753 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 719 insertions(+), 34 deletions(-)
diff --git a/include/qtl_postgres.hpp b/include/qtl_postgres.hpp
index 4bc2148..3418c22 100644
--- a/include/qtl_postgres.hpp
+++ b/include/qtl_postgres.hpp
@@ -13,6 +13,7 @@
#include <algorithm>
#include <assert.h>
#include "qtl_common.hpp"
+#include "qtl_async.hpp"
#define FRONTEND
@@ -26,7 +27,6 @@
extern "C"
{
-#include <c.h>
#include <postgres.h>
#include <catalog/pg_type.h>
}
@@ -164,11 +164,11 @@
class error : public std::exception
{
public:
- error() : m_errmsg(nullptr) { }
+ error() : m_errmsg() { }
explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS)
{
- PQsetErrorVerbosity(conn, verbosity);
- PQsetErrorContextVisibility(conn, show_context);
+ //PQsetErrorVerbosity(conn, verbosity);
+ //PQsetErrorContextVisibility(conn, show_context);
const char* errmsg = PQerrorMessage(conn);
if (errmsg) m_errmsg = errmsg;
else m_errmsg.clear();
@@ -182,9 +182,19 @@
}
virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); }
+ operator bool() const { return !m_errmsg.empty(); }
-private:
+protected:
std::string m_errmsg;
+};
+
+class timeout : public error
+{
+public:
+ timeout()
+ {
+ m_errmsg = "timeout";
+ }
};
inline void verify_pgtypes_error(int ret)
@@ -992,7 +1002,7 @@
if (end - data < size)
throw std::overflow_error("insufficient data left in message");
data = object_traits<T>::get(value, data, data + size);
- if (data >= end)
+ if (data > end)
throw std::overflow_error("insufficient data left in message");
result.push_back(value);
}
@@ -1494,6 +1504,17 @@
}
}
+ template<ExecStatusType... Excepted>
+ void verify_error(error& e)
+ {
+ if (m_res)
+ {
+ ExecStatusType got = status();
+ if (!in<ExecStatusType, Excepted...>(got))
+ e = error(m_res);
+ }
+ }
+
void clear()
{
if (m_res)
@@ -1517,15 +1538,27 @@
}
base_statement(const base_statement&) = delete;
base_statement(base_statement&& src)
- : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res))
+ : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name))
{
+ }
+ base_statement& operator=(const base_statement&) = delete;
+ base_statement& operator=(base_statement&& src)
+ {
+ if (this != &src)
+ {
+ close();
+ m_conn = src.m_conn;
+ m_binders = std::move(src.m_binders);
+ m_res = std::move(src.m_res);
+ }
+ return *this;
}
result& get_result() { return m_res; }
void close()
{
- m_res=nullptr;
+ m_res = nullptr;
}
uint64_t affetced_rows() const
@@ -1614,6 +1647,7 @@
protected:
PGconn* m_conn;
result m_res;
+ std::string _name;
std::vector<binder> m_binders;
template<ExecStatusType... Excepted>
@@ -1624,6 +1658,13 @@
else
throw error(m_conn);
}
+ void finish(result& res)
+ {
+ while (res)
+ {
+ res = PQgetResult(m_conn);
+ }
+ }
};
class statement : public base_statement
@@ -1633,7 +1674,7 @@
{
}
statement(const statement&) = delete;
- statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name))
+ statement(statement&& src) : base_statement(std::move(src))
{
}
@@ -1761,17 +1802,6 @@
finish(m_res);
m_res.clear();
}
-
-private:
- std::string _name;
-
- void finish(result& res)
- {
- while (res)
- {
- res = PQgetResult(m_conn);
- }
- }
};
class base_database
@@ -1783,6 +1813,8 @@
}
public:
+ typedef postgres::error exception_type;
+
base_database(const base_database&) = delete;
base_database(base_database&& src)
{
@@ -1796,8 +1828,8 @@
PQfinish(m_conn);
}
- base_database& operator==(const base_database&) = delete;
- base_database& operator==(base_database&& src)
+ base_database& operator=(const base_database&) = delete;
+ base_database& operator=(base_database&& src)
{
if (this != &src)
{
@@ -1882,12 +1914,10 @@
PQreset(m_conn);
}
- bool is_alive()
+ void close()
{
- }
-
- PGPing ping()
- {
+ PQfinish(m_conn);
+ m_conn = nullptr;
}
protected:
@@ -1920,7 +1950,7 @@
m_res.get_column_type(j));
}
qtl::bind_record(*this, std::forward<decltype(values)>(values));
- proc(values);
+ qtl::detail::apply(proc, std::forward<decltype(values)>(values));
}
}
}
@@ -1959,12 +1989,6 @@
sprintf(port_text, "%u", port);
m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password);
return m_conn != nullptr && status() == CONNECTION_OK;
- }
-
- void close()
- {
- PQfinish(m_conn);
- m_conn = nullptr;
}
statement open_command(const char* query_text, size_t /*text_length*/)
@@ -2023,8 +2047,669 @@
simple_execute("COMMIT");
}
+ bool is_alive()
+ {
+ qtl::postgres::result res(PQexec(m_conn, ""));
+ return res && res.status() == PGRES_COMMAND_OK;
+ }
+
};
+inline int event_flags(PostgresPollingStatusType status)
+{
+ int flags = 0;
+ if (status == PGRES_POLLING_READING)
+ flags |= event::ef_read;
+ else if (status == PGRES_POLLING_WRITING)
+ flags |= event::ef_write;
+ else if (status == PGRES_POLLING_FAILED)
+ flags |= event::ef_exception;
+ return flags;
+}
+
+class async_connection;
+
+template<typename Handler>
+inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler)
+{
+ int flushed = PQflush(conn);
+ if (flushed < 0)
+ {
+ handler(error(conn));
+ return;
+ }
+ if (flushed == 1)
+ {
+ event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout,
+ [event, conn, timeout, handler](int flags) mutable {
+ if (flags&qtl::event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ return;
+ }
+ if (flags&qtl::event::ef_read)
+ {
+ if (!PQconsumeInput(conn))
+ {
+ handler(error(conn));
+ return;
+ }
+ }
+ if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception))
+ async_wait(event, conn, timeout, handler);
+ });
+ }
+ else
+ {
+ event->set_io_handler(qtl::event::ef_read, 10,
+ [event, conn, timeout, handler](int flags) mutable {
+ if (flags&qtl::event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if (flags&(qtl::event::ef_read | qtl::event::ef_exception))
+ {
+ if (PQconsumeInput(conn))
+ {
+ if (!PQisBusy(conn))
+ handler(postgres::error());
+ else
+ async_wait(event, conn, timeout, handler);
+ }
+ else
+ {
+ handler(postgres::error(conn));
+ }
+ }
+ else
+ {
+ handler(postgres::error(conn));
+ }
+ });
+ }
+}
+
+class async_statement : public base_statement
+{
+public:
+ async_statement(async_connection& db);
+ async_statement(async_statement&& src)
+ : base_statement(std::move(src)), m_timeout(2)
+ {
+ m_event = src.m_event;
+ m_timeout = src.m_timeout;
+ src.m_event = nullptr;
+ }
+ async_statement& operator=(async_statement&& src)
+ {
+ if (this != &src)
+ {
+ base_statement::operator =(std::move(src));
+ m_event = src.m_event;
+ m_timeout = src.m_timeout;
+ src.m_event = nullptr;
+ }
+ return *this;
+ }
+ ~async_statement()
+ {
+ close();
+ }
+
+ /*
+ Handler defiens as:
+ void handler(const qtl::mysql::error& e);
+ */
+ template<typename Handler>
+ void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr)
+ {
+ _name.resize(sizeof(intptr_t) * 2 + 1);
+ int n = sprintf(const_cast<char*>(_name.data()), "q%p", this);
+ _name.resize(n);
+ std::transform(_name.begin(), _name.end(), _name.begin(), tolower);
+ if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes))
+ {
+ async_wait([this, handler](error e) mutable {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ if (m_res)
+ {
+ m_res.verify_error<PGRES_COMMAND_OK>(e);
+ while(m_res)
+ m_res = PQgetResult(m_conn);
+ }
+ }
+ handler(e);
+ });
+ }
+ else
+ {
+ _name.clear();
+ handler(error(m_conn));
+ }
+ }
+
+ template<typename Handler, typename... Types>
+ void open(Handler&& handler, const char* command)
+ {
+ auto binder_list = make_binder_list(Types()...);
+ std::array<Oid, sizeof...(Types)> types;
+ std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) {
+ return b.type();
+ });
+
+ open(std::forward<Handler>(handler), command, types.size(), types.data());
+ }
+
+ void close()
+ {
+ while (m_res)
+ {
+ m_res = PQgetResult(m_conn);
+ }
+
+ if (!_name.empty())
+ {
+ std::ostringstream oss;
+ oss << "DEALLOCATE " << _name << ";";
+ result res = PQexec(m_conn, oss.str().data());
+ error e;
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ finish(res);
+ if(e) throw e;
+ }
+ base_statement::close();
+ }
+
+ template<typename Handler>
+ void close(Handler&& handler)
+ {
+ while (m_res)
+ {
+ if(PQisBusy(m_conn))
+ {
+ async_wait([this, handler](const error& e) mutable {
+ close(handler);
+ });
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ }
+ }
+
+ if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK)
+ {
+ std::ostringstream oss;
+ oss << "DEALLOCATE " << _name << ";";
+ bool ok = PQsendQuery(m_conn, oss.str().data());
+ if (ok)
+ {
+ async_wait([this, handler](postgres::error e) mutable {
+ if (PQstatus(m_conn) == CONNECTION_OK)
+ {
+ result res(PQgetResult(m_conn));
+ if (res)
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ if (!e) _name.clear();
+ finish(res);
+ handler(e);
+ }
+ else
+ {
+ _name.clear();
+ handler(error());
+ }
+ });
+ }
+ else
+ {
+ handler(error(m_conn));
+ }
+ }
+ else
+ {
+ _name.clear();
+ }
+ }
+
+ /*
+ ExecuteHandler defiens as:
+ void handler(const qtl::mysql::error& e, uint64_t affected);
+ */
+ template<typename ExecuteHandler>
+ void execute(ExecuteHandler&& handler)
+ {
+ if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) &&
+ PQsetSingleRowMode(m_conn))
+ {
+ async_wait([this, handler](error e) {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+ finish(m_res);
+ }
+ handler(e);
+ });
+ }
+ else
+ {
+ handler(error(m_conn));
+ }
+ }
+
+ template<typename Types, typename Handler>
+ void execute(const Types& params, Handler&& handler)
+ {
+ const size_t count = qtl::params_binder<statement, Types>::size;
+ if (count > 0)
+ {
+ m_binders.resize(count);
+ qtl::bind_params(*this, params);
+
+ std::array<const char*, count> values;
+ std::array<int, count> lengths;
+ std::array<int, count> formats;
+ for (size_t i = 0; i != m_binders.size(); i++)
+ {
+ values[i] = m_binders[i].value();
+ lengths[i] = static_cast<int>(m_binders[i].length());
+ formats[i] = 1;
+ }
+ if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ }
+ else
+ {
+ if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ }
+ if (!PQsetSingleRowMode(m_conn))
+ {
+ handler(error(m_conn), 0);
+ return;
+ }
+ if (PQisBusy(m_conn))
+ {
+ async_wait([this, handler](error e) mutable {
+ if (!e)
+ {
+ m_res = PQgetResult(m_conn);
+ m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
+ int64_t affected = m_res.affected_rows();
+ finish(m_res);
+ handler(e, affected);
+ }
+ else
+ {
+ handler(e, 0);
+ }
+ });
+ }
+ }
+
+ template<typename Types, typename RowHandler, typename FinishHandler>
+ void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
+ {
+ if (m_res)
+ {
+ ExecStatusType status = m_res.status();
+ if (status == PGRES_SINGLE_TUPLE)
+ {
+ int count = m_res.get_column_count();
+ if (count > 0)
+ {
+ m_binders.resize(count);
+ for (int i = 0; i != count; i++)
+ {
+ m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i),
+ m_res.get_column_type(i));
+ }
+ qtl::bind_record(*this, std::forward<Types>(values));
+ }
+ row_handler();
+ if (PQisBusy(m_conn))
+ {
+ async_wait([this, &values, row_handler, finish_handler](const error& e) {
+ if (e)
+ {
+ finish_handler(e);
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ fetch(std::forward<Types>(values), row_handler, finish_handler);
+ }
+ });
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ fetch(std::forward<Types>(values), row_handler, finish_handler);
+ }
+ }
+ else
+ {
+ error e;
+ m_res.verify_error<PGRES_TUPLES_OK>(e);
+ finish_handler(e);
+ }
+ }
+ else
+ {
+ finish_handler(error());
+ }
+ }
+
+ template<typename Handler>
+ void next_result(Handler&& handler)
+ {
+ async_wait([this, handler](const error& e) {
+ if (e)
+ {
+ handler(e);
+ }
+ else
+ {
+ m_res = PQgetResult(m_conn);
+ handler(error());
+ }
+ });
+ }
+
+private:
+ event* m_event;
+ int m_timeout;
+ template<typename Handler>
+ void async_wait(Handler&& handler)
+ {
+ qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler));
+ }
+};
+
+class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement>
+{
+public:
+ async_connection() : m_connect_timeout(2), m_query_timeout(2)
+ {
+ }
+ async_connection(async_connection&& src)
+ : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout)
+ {
+ }
+ async_connection& operator=(async_connection&& src)
+ {
+ if (this != &src)
+ {
+ base_database::operator=(std::move(src));
+ m_connect_timeout = src.m_connect_timeout;
+ m_query_timeout = src.m_query_timeout;
+ }
+ return *this;
+ }
+
+ /*
+ OpenHandler defines as:
+ void handler(const qtl::postgres::error& e) NOEXCEPT;
+ */
+ template<typename EventLoop, typename OpenHandler>
+ void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false)
+ {
+ std::vector<const char*> keywords;
+ std::vector<const char*> values;
+ keywords.reserve(params.size());
+ values.reserve(params.size());
+ for (auto& param : params)
+ {
+ keywords.push_back(param.first.data());
+ values.push_back(param.second.data());
+ }
+ keywords.push_back(nullptr);
+ values.push_back(nullptr);
+ m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname);
+ if (m_conn == nullptr)
+ throw std::bad_alloc();
+ if (status() == CONNECTION_BAD)
+ {
+ handler(error(m_conn));
+ return;
+ }
+
+ if (PQsetnonblocking(m_conn, true)!=0)
+ handler(error(m_conn));
+ get_options();
+ bind(ev);
+ wait_connect(std::forward<OpenHandler>(handler));
+ }
+
+ template<typename EventLoop, typename OpenHandler>
+ void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo)
+ {
+ m_conn = PQconnectStart(conninfo);
+ if (m_conn == nullptr)
+ throw std::bad_alloc();
+ if (status() == CONNECTION_BAD)
+ {
+ handler(error(m_conn));
+ return;
+ }
+
+ PQsetnonblocking(m_conn, true);
+ get_options();
+ bind(ev);
+ wait_connect(std::forward<OpenHandler>(handler));
+ }
+
+ template<typename OpenHandler>
+ void reset(OpenHandler&& handler)
+ {
+ PQresetStart(m_conn);
+ wait_reset(std::forward<OpenHandler>(handler));
+ }
+
+ /*
+ Handler defines as:
+ void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
+ */
+ template<typename ExecuteHandler>
+ void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
+ {
+ bool ok = PQsendQuery(m_conn, query_text);
+ if (ok)
+ {
+ async_wait([this, handler](postgres::error e) mutable {
+ result res(PQgetResult(m_conn));
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ uint64_t affected = res.affected_rows();
+ handler(e, affected);
+ while (res)
+ res = PQgetResult(m_conn);
+ });
+ }
+ else
+ {
+ handler(error(m_conn), 0);
+ }
+ }
+
+ template<typename Handler>
+ void auto_commit(Handler&& handler, bool on) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler),
+ on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF");
+ }
+
+ template<typename Handler>
+ void begin_transaction(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "BEGIN");
+ }
+
+ template<typename Handler>
+ void rollback(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "ROLLBACK");
+ }
+
+ template<typename Handler>
+ void commit(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "COMMIT");
+ }
+
+ /*
+ ResultHandler defines as:
+ void result_handler(const qtl::postgres::error& e) NOEXCEPT;
+ */
+ template<typename RowHandler, typename ResultHandler>
+ void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
+ {
+ bool ok = PQsendQuery(m_conn, query);
+ if (ok)
+ {
+ async_wait([this, row_handler, result_handler](postgres::error e) mutable {
+ result res(PQgetResult(m_conn));
+ res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
+ if (e)
+ {
+ result_handler(e, 0);
+ return;
+ }
+ uint64_t affected = res.affected_rows();
+ while (res && res.status() == PGRES_TUPLES_OK)
+ {
+ simple_statment stmt(*this, std::move(res));
+ stmt.fetch_all(row_handler);
+ res = PQgetResult(m_conn);
+ }
+ result_handler(e, affected);
+ });
+ }
+ else
+ {
+ result_handler(error(m_conn), 0);
+ }
+ }
+
+ template<typename Handler>
+ void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler)
+ {
+ std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this);
+ stmt->open([stmt, handler](const postgres::error& e) mutable {
+ handler(e, stmt);
+ }, query_text, 0);
+ }
+
+ template<typename Handler>
+ bool is_alive(Handler&& handler) NOEXCEPT
+ {
+ simple_execute(std::forward<Handler>(handler), "");
+ }
+
+ socket_type socket() const NOEXCEPT { return PQsocket(m_conn); }
+
+ int connect_timeout() const { return m_connect_timeout; }
+ void connect_timeout(int timeout) { m_connect_timeout = timeout; }
+ int query_timeout() const { return m_query_timeout; }
+ void query_timeout(int timeout) { m_query_timeout = timeout; }
+
+private:
+ int m_connect_timeout;
+ int m_query_timeout;
+
+ void get_options()
+ {
+ PQconninfoOption* options = PQconninfo(m_conn);
+ m_connect_timeout = 2;
+ for (PQconninfoOption* option = options; option; option++)
+ {
+ if (strcmp(option->keyword, "connect_timeout") == 0)
+ {
+ if (option->val)
+ m_connect_timeout = atoi(option->val);
+ break;
+ }
+ }
+ PQconninfoFree(options);
+ }
+
+ template<typename OpenHandler>
+ void wait_connect(OpenHandler&& handler) NOEXCEPT
+ {
+ PostgresPollingStatusType status = PQconnectPoll(m_conn);
+ switch (status)
+ {
+ case PGRES_POLLING_READING:
+ case PGRES_POLLING_WRITING:
+ m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+ [this, handler](int flags) mutable {
+ if (flags&event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if(flags&(event::ef_read|event::ef_write | event::ef_exception))
+ wait_connect(std::forward<OpenHandler>(handler));
+ });
+ break;
+ case PGRES_POLLING_FAILED:
+ handler(postgres::error(handle()));
+ break;
+ case PGRES_POLLING_OK:
+ //PQsetnonblocking(m_conn, true);
+ handler(postgres::error());
+ }
+ }
+
+ template<typename OpenHandler>
+ void wait_reset(OpenHandler&& handler) NOEXCEPT
+ {
+ PostgresPollingStatusType status = PQresetPoll(m_conn);
+ switch (status)
+ {
+ case PGRES_POLLING_READING:
+ case PGRES_POLLING_WRITING:
+ m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
+ [this, handler](int flags) mutable {
+ if (flags&event::ef_timeout)
+ {
+ handler(postgres::timeout());
+ }
+ else if (flags&(event::ef_read | event::ef_write | event::ef_exception))
+ wait_reset(std::forward<OpenHandler>(handler));
+ });
+ break;
+ case PGRES_POLLING_FAILED:
+ handler(postgres::error(m_conn));
+ break;
+ case PGRES_POLLING_OK:
+ handler(postgres::error());
+ }
+ }
+
+ template<typename Handler>
+ void async_wait(Handler&& handler)
+ {
+ qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler));
+ }
+
+};
+
+inline async_statement::async_statement(async_connection& db)
+ : base_statement(static_cast<base_database&>(db))
+{
+ m_event = db.event();
+ m_timeout = db.query_timeout();
+}
+
+
typedef qtl::transaction<database> transaction;
template<typename Record>
--
Gitblit v1.9.3