| | |
| | | #include <algorithm> |
| | | #include <assert.h> |
| | | #include "qtl_common.hpp" |
| | | #include "qtl_async.hpp" |
| | | |
| | | #define FRONTEND |
| | | |
| | |
| | | |
| | | extern "C" |
| | | { |
| | | #include <c.h> |
| | | #include <postgres.h> |
| | | #include <catalog/pg_type.h> |
| | | } |
| | |
| | | class error : public std::exception |
| | | { |
| | | public: |
| | | error() : m_errmsg(nullptr) { } |
| | | error() : m_errmsg() { } |
| | | explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS) |
| | | { |
| | | PQsetErrorVerbosity(conn, verbosity); |
| | | PQsetErrorContextVisibility(conn, show_context); |
| | | //PQsetErrorVerbosity(conn, verbosity); |
| | | //PQsetErrorContextVisibility(conn, show_context); |
| | | const char* errmsg = PQerrorMessage(conn); |
| | | if (errmsg) m_errmsg = errmsg; |
| | | else m_errmsg.clear(); |
| | |
| | | } |
| | | |
| | | virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); } |
| | | operator bool() const { return !m_errmsg.empty(); } |
| | | |
| | | private: |
| | | protected: |
| | | std::string m_errmsg; |
| | | }; |
| | | |
| | | class timeout : public error |
| | | { |
| | | public: |
| | | timeout() |
| | | { |
| | | m_errmsg = "timeout"; |
| | | } |
| | | }; |
| | | |
| | | inline void verify_pgtypes_error(int ret) |
| | |
| | | if (end - data < size) |
| | | throw std::overflow_error("insufficient data left in message"); |
| | | data = object_traits<T>::get(value, data, data + size); |
| | | if (data >= end) |
| | | if (data > end) |
| | | throw std::overflow_error("insufficient data left in message"); |
| | | result.push_back(value); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | template<ExecStatusType... Excepted> |
| | | void verify_error(error& e) |
| | | { |
| | | if (m_res) |
| | | { |
| | | ExecStatusType got = status(); |
| | | if (!in<ExecStatusType, Excepted...>(got)) |
| | | e = error(m_res); |
| | | } |
| | | } |
| | | |
| | | void clear() |
| | | { |
| | | if (m_res) |
| | |
| | | } |
| | | base_statement(const base_statement&) = delete; |
| | | base_statement(base_statement&& src) |
| | | : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)) |
| | | : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name)) |
| | | { |
| | | } |
| | | base_statement& operator=(const base_statement&) = delete; |
| | | base_statement& operator=(base_statement&& src) |
| | | { |
| | | if (this != &src) |
| | | { |
| | | close(); |
| | | m_conn = src.m_conn; |
| | | m_binders = std::move(src.m_binders); |
| | | m_res = std::move(src.m_res); |
| | | } |
| | | return *this; |
| | | } |
| | | |
| | | result& get_result() { return m_res; } |
| | | |
| | | void close() |
| | | { |
| | | m_res=nullptr; |
| | | m_res = nullptr; |
| | | } |
| | | |
| | | uint64_t affetced_rows() const |
| | |
| | | protected: |
| | | PGconn* m_conn; |
| | | result m_res; |
| | | std::string _name; |
| | | std::vector<binder> m_binders; |
| | | |
| | | template<ExecStatusType... Excepted> |
| | |
| | | else |
| | | throw error(m_conn); |
| | | } |
| | | void finish(result& res) |
| | | { |
| | | while (res) |
| | | { |
| | | res = PQgetResult(m_conn); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | class statement : public base_statement |
| | |
| | | { |
| | | } |
| | | statement(const statement&) = delete; |
| | | statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name)) |
| | | statement(statement&& src) : base_statement(std::move(src)) |
| | | { |
| | | } |
| | | |
| | |
| | | finish(m_res); |
| | | m_res.clear(); |
| | | } |
| | | |
| | | private: |
| | | std::string _name; |
| | | |
| | | void finish(result& res) |
| | | { |
| | | while (res) |
| | | { |
| | | res = PQgetResult(m_conn); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | class base_database |
| | |
| | | } |
| | | |
| | | public: |
| | | typedef postgres::error exception_type; |
| | | |
| | | base_database(const base_database&) = delete; |
| | | base_database(base_database&& src) |
| | | { |
| | |
| | | PQfinish(m_conn); |
| | | } |
| | | |
| | | base_database& operator==(const base_database&) = delete; |
| | | base_database& operator==(base_database&& src) |
| | | base_database& operator=(const base_database&) = delete; |
| | | base_database& operator=(base_database&& src) |
| | | { |
| | | if (this != &src) |
| | | { |
| | |
| | | PQreset(m_conn); |
| | | } |
| | | |
| | | bool is_alive() |
| | | void close() |
| | | { |
| | | } |
| | | |
| | | PGPing ping() |
| | | { |
| | | PQfinish(m_conn); |
| | | m_conn = nullptr; |
| | | } |
| | | |
| | | protected: |
| | |
| | | m_res.get_column_type(j)); |
| | | } |
| | | qtl::bind_record(*this, std::forward<decltype(values)>(values)); |
| | | proc(values); |
| | | qtl::detail::apply(proc, std::forward<decltype(values)>(values)); |
| | | } |
| | | } |
| | | } |
| | |
| | | sprintf(port_text, "%u", port); |
| | | m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password); |
| | | return m_conn != nullptr && status() == CONNECTION_OK; |
| | | } |
| | | |
| | | void close() |
| | | { |
| | | PQfinish(m_conn); |
| | | m_conn = nullptr; |
| | | } |
| | | |
| | | statement open_command(const char* query_text, size_t /*text_length*/) |
| | |
| | | simple_execute("COMMIT"); |
| | | } |
| | | |
| | | bool is_alive() |
| | | { |
| | | qtl::postgres::result res(PQexec(m_conn, "")); |
| | | return res && res.status() == PGRES_COMMAND_OK; |
| | | } |
| | | |
| | | }; |
| | | |
| | | inline int event_flags(PostgresPollingStatusType status) |
| | | { |
| | | int flags = 0; |
| | | if (status == PGRES_POLLING_READING) |
| | | flags |= event::ef_read; |
| | | else if (status == PGRES_POLLING_WRITING) |
| | | flags |= event::ef_write; |
| | | else if (status == PGRES_POLLING_FAILED) |
| | | flags |= event::ef_exception; |
| | | return flags; |
| | | } |
| | | |
| | | class async_connection; |
| | | |
| | | template<typename Handler> |
| | | inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler) |
| | | { |
| | | int flushed = PQflush(conn); |
| | | if (flushed < 0) |
| | | { |
| | | handler(error(conn)); |
| | | return; |
| | | } |
| | | if (flushed == 1) |
| | | { |
| | | event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout, |
| | | [event, conn, timeout, handler](int flags) mutable { |
| | | if (flags&qtl::event::ef_timeout) |
| | | { |
| | | handler(postgres::timeout()); |
| | | return; |
| | | } |
| | | if (flags&qtl::event::ef_read) |
| | | { |
| | | if (!PQconsumeInput(conn)) |
| | | { |
| | | handler(error(conn)); |
| | | return; |
| | | } |
| | | } |
| | | if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception)) |
| | | async_wait(event, conn, timeout, handler); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | event->set_io_handler(qtl::event::ef_read, 10, |
| | | [event, conn, timeout, handler](int flags) mutable { |
| | | if (flags&qtl::event::ef_timeout) |
| | | { |
| | | handler(postgres::timeout()); |
| | | } |
| | | else if (flags&(qtl::event::ef_read | qtl::event::ef_exception)) |
| | | { |
| | | if (PQconsumeInput(conn)) |
| | | { |
| | | if (!PQisBusy(conn)) |
| | | handler(postgres::error()); |
| | | else |
| | | async_wait(event, conn, timeout, handler); |
| | | } |
| | | else |
| | | { |
| | | handler(postgres::error(conn)); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | handler(postgres::error(conn)); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | class async_statement : public base_statement |
| | | { |
| | | public: |
| | | async_statement(async_connection& db); |
| | | async_statement(async_statement&& src) |
| | | : base_statement(std::move(src)), m_timeout(2) |
| | | { |
| | | m_event = src.m_event; |
| | | m_timeout = src.m_timeout; |
| | | src.m_event = nullptr; |
| | | } |
| | | async_statement& operator=(async_statement&& src) |
| | | { |
| | | if (this != &src) |
| | | { |
| | | base_statement::operator =(std::move(src)); |
| | | m_event = src.m_event; |
| | | m_timeout = src.m_timeout; |
| | | src.m_event = nullptr; |
| | | } |
| | | return *this; |
| | | } |
| | | ~async_statement() |
| | | { |
| | | close(); |
| | | } |
| | | |
| | | /* |
| | | Handler defiens as: |
| | | void handler(const qtl::mysql::error& e); |
| | | */ |
| | | template<typename Handler> |
| | | void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr) |
| | | { |
| | | _name.resize(sizeof(intptr_t) * 2 + 1); |
| | | int n = sprintf(const_cast<char*>(_name.data()), "q%p", this); |
| | | _name.resize(n); |
| | | std::transform(_name.begin(), _name.end(), _name.begin(), tolower); |
| | | if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes)) |
| | | { |
| | | async_wait([this, handler](error e) mutable { |
| | | if (!e) |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | if (m_res) |
| | | { |
| | | m_res.verify_error<PGRES_COMMAND_OK>(e); |
| | | while(m_res) |
| | | m_res = PQgetResult(m_conn); |
| | | } |
| | | } |
| | | handler(e); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | _name.clear(); |
| | | handler(error(m_conn)); |
| | | } |
| | | } |
| | | |
| | | template<typename Handler, typename... Types> |
| | | void open(Handler&& handler, const char* command) |
| | | { |
| | | auto binder_list = make_binder_list(Types()...); |
| | | std::array<Oid, sizeof...(Types)> types; |
| | | std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) { |
| | | return b.type(); |
| | | }); |
| | | |
| | | open(std::forward<Handler>(handler), command, types.size(), types.data()); |
| | | } |
| | | |
| | | void close() |
| | | { |
| | | while (m_res) |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | } |
| | | |
| | | if (!_name.empty()) |
| | | { |
| | | std::ostringstream oss; |
| | | oss << "DEALLOCATE " << _name << ";"; |
| | | result res = PQexec(m_conn, oss.str().data()); |
| | | error e; |
| | | res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e); |
| | | finish(res); |
| | | if(e) throw e; |
| | | } |
| | | base_statement::close(); |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void close(Handler&& handler) |
| | | { |
| | | while (m_res) |
| | | { |
| | | if(PQisBusy(m_conn)) |
| | | { |
| | | async_wait([this, handler](const error& e) mutable { |
| | | close(handler); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | } |
| | | } |
| | | |
| | | if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK) |
| | | { |
| | | std::ostringstream oss; |
| | | oss << "DEALLOCATE " << _name << ";"; |
| | | bool ok = PQsendQuery(m_conn, oss.str().data()); |
| | | if (ok) |
| | | { |
| | | async_wait([this, handler](postgres::error e) mutable { |
| | | if (PQstatus(m_conn) == CONNECTION_OK) |
| | | { |
| | | result res(PQgetResult(m_conn)); |
| | | if (res) |
| | | res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e); |
| | | if (!e) _name.clear(); |
| | | finish(res); |
| | | handler(e); |
| | | } |
| | | else |
| | | { |
| | | _name.clear(); |
| | | handler(error()); |
| | | } |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | handler(error(m_conn)); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | _name.clear(); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | ExecuteHandler defiens as: |
| | | void handler(const qtl::mysql::error& e, uint64_t affected); |
| | | */ |
| | | template<typename ExecuteHandler> |
| | | void execute(ExecuteHandler&& handler) |
| | | { |
| | | if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) && |
| | | PQsetSingleRowMode(m_conn)) |
| | | { |
| | | async_wait([this, handler](error e) { |
| | | if (!e) |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e); |
| | | finish(m_res); |
| | | } |
| | | handler(e); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | handler(error(m_conn)); |
| | | } |
| | | } |
| | | |
| | | template<typename Types, typename Handler> |
| | | void execute(const Types& params, Handler&& handler) |
| | | { |
| | | const size_t count = qtl::params_binder<statement, Types>::size; |
| | | if (count > 0) |
| | | { |
| | | m_binders.resize(count); |
| | | qtl::bind_params(*this, params); |
| | | |
| | | std::array<const char*, count> values; |
| | | std::array<int, count> lengths; |
| | | std::array<int, count> formats; |
| | | for (size_t i = 0; i != m_binders.size(); i++) |
| | | { |
| | | values[i] = m_binders[i].value(); |
| | | lengths[i] = static_cast<int>(m_binders[i].length()); |
| | | formats[i] = 1; |
| | | } |
| | | if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1)) |
| | | { |
| | | handler(error(m_conn), 0); |
| | | return; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1)) |
| | | { |
| | | handler(error(m_conn), 0); |
| | | return; |
| | | } |
| | | } |
| | | if (!PQsetSingleRowMode(m_conn)) |
| | | { |
| | | handler(error(m_conn), 0); |
| | | return; |
| | | } |
| | | if (PQisBusy(m_conn)) |
| | | { |
| | | async_wait([this, handler](error e) mutable { |
| | | if (!e) |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e); |
| | | int64_t affected = m_res.affected_rows(); |
| | | finish(m_res); |
| | | handler(e, affected); |
| | | } |
| | | else |
| | | { |
| | | handler(e, 0); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | template<typename Types, typename RowHandler, typename FinishHandler> |
| | | void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler) |
| | | { |
| | | if (m_res) |
| | | { |
| | | ExecStatusType status = m_res.status(); |
| | | if (status == PGRES_SINGLE_TUPLE) |
| | | { |
| | | int count = m_res.get_column_count(); |
| | | if (count > 0) |
| | | { |
| | | m_binders.resize(count); |
| | | for (int i = 0; i != count; i++) |
| | | { |
| | | m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i), |
| | | m_res.get_column_type(i)); |
| | | } |
| | | qtl::bind_record(*this, std::forward<Types>(values)); |
| | | } |
| | | row_handler(); |
| | | if (PQisBusy(m_conn)) |
| | | { |
| | | async_wait([this, &values, row_handler, finish_handler](const error& e) { |
| | | if (e) |
| | | { |
| | | finish_handler(e); |
| | | } |
| | | else |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | fetch(std::forward<Types>(values), row_handler, finish_handler); |
| | | } |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | fetch(std::forward<Types>(values), row_handler, finish_handler); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | error e; |
| | | m_res.verify_error<PGRES_TUPLES_OK>(e); |
| | | finish_handler(e); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | finish_handler(error()); |
| | | } |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void next_result(Handler&& handler) |
| | | { |
| | | async_wait([this, handler](const error& e) { |
| | | if (e) |
| | | { |
| | | handler(e); |
| | | } |
| | | else |
| | | { |
| | | m_res = PQgetResult(m_conn); |
| | | handler(error()); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private: |
| | | event* m_event; |
| | | int m_timeout; |
| | | template<typename Handler> |
| | | void async_wait(Handler&& handler) |
| | | { |
| | | qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler)); |
| | | } |
| | | }; |
| | | |
| | | class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement> |
| | | { |
| | | public: |
| | | async_connection() : m_connect_timeout(2), m_query_timeout(2) |
| | | { |
| | | } |
| | | async_connection(async_connection&& src) |
| | | : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout) |
| | | { |
| | | } |
| | | async_connection& operator=(async_connection&& src) |
| | | { |
| | | if (this != &src) |
| | | { |
| | | base_database::operator=(std::move(src)); |
| | | m_connect_timeout = src.m_connect_timeout; |
| | | m_query_timeout = src.m_query_timeout; |
| | | } |
| | | return *this; |
| | | } |
| | | |
| | | /* |
| | | OpenHandler defines as: |
| | | void handler(const qtl::postgres::error& e) NOEXCEPT; |
| | | */ |
| | | template<typename EventLoop, typename OpenHandler> |
| | | void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false) |
| | | { |
| | | std::vector<const char*> keywords; |
| | | std::vector<const char*> values; |
| | | keywords.reserve(params.size()); |
| | | values.reserve(params.size()); |
| | | for (auto& param : params) |
| | | { |
| | | keywords.push_back(param.first.data()); |
| | | values.push_back(param.second.data()); |
| | | } |
| | | keywords.push_back(nullptr); |
| | | values.push_back(nullptr); |
| | | m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname); |
| | | if (m_conn == nullptr) |
| | | throw std::bad_alloc(); |
| | | if (status() == CONNECTION_BAD) |
| | | { |
| | | handler(error(m_conn)); |
| | | return; |
| | | } |
| | | |
| | | if (PQsetnonblocking(m_conn, true)!=0) |
| | | handler(error(m_conn)); |
| | | get_options(); |
| | | bind(ev); |
| | | wait_connect(std::forward<OpenHandler>(handler)); |
| | | } |
| | | |
| | | template<typename EventLoop, typename OpenHandler> |
| | | void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo) |
| | | { |
| | | m_conn = PQconnectStart(conninfo); |
| | | if (m_conn == nullptr) |
| | | throw std::bad_alloc(); |
| | | if (status() == CONNECTION_BAD) |
| | | { |
| | | handler(error(m_conn)); |
| | | return; |
| | | } |
| | | |
| | | PQsetnonblocking(m_conn, true); |
| | | get_options(); |
| | | bind(ev); |
| | | wait_connect(std::forward<OpenHandler>(handler)); |
| | | } |
| | | |
| | | template<typename OpenHandler> |
| | | void reset(OpenHandler&& handler) |
| | | { |
| | | PQresetStart(m_conn); |
| | | wait_reset(std::forward<OpenHandler>(handler)); |
| | | } |
| | | |
| | | /* |
| | | Handler defines as: |
| | | void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT; |
| | | */ |
| | | template<typename ExecuteHandler> |
| | | void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT |
| | | { |
| | | bool ok = PQsendQuery(m_conn, query_text); |
| | | if (ok) |
| | | { |
| | | async_wait([this, handler](postgres::error e) mutable { |
| | | result res(PQgetResult(m_conn)); |
| | | res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e); |
| | | uint64_t affected = res.affected_rows(); |
| | | handler(e, affected); |
| | | while (res) |
| | | res = PQgetResult(m_conn); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | handler(error(m_conn), 0); |
| | | } |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void auto_commit(Handler&& handler, bool on) NOEXCEPT |
| | | { |
| | | simple_execute(std::forward<Handler>(handler), |
| | | on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF"); |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void begin_transaction(Handler&& handler) NOEXCEPT |
| | | { |
| | | simple_execute(std::forward<Handler>(handler), "BEGIN"); |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void rollback(Handler&& handler) NOEXCEPT |
| | | { |
| | | simple_execute(std::forward<Handler>(handler), "ROLLBACK"); |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void commit(Handler&& handler) NOEXCEPT |
| | | { |
| | | simple_execute(std::forward<Handler>(handler), "COMMIT"); |
| | | } |
| | | |
| | | /* |
| | | ResultHandler defines as: |
| | | void result_handler(const qtl::postgres::error& e) NOEXCEPT; |
| | | */ |
| | | template<typename RowHandler, typename ResultHandler> |
| | | void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT |
| | | { |
| | | bool ok = PQsendQuery(m_conn, query); |
| | | if (ok) |
| | | { |
| | | async_wait([this, row_handler, result_handler](postgres::error e) mutable { |
| | | result res(PQgetResult(m_conn)); |
| | | res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e); |
| | | if (e) |
| | | { |
| | | result_handler(e, 0); |
| | | return; |
| | | } |
| | | uint64_t affected = res.affected_rows(); |
| | | while (res && res.status() == PGRES_TUPLES_OK) |
| | | { |
| | | simple_statment stmt(*this, std::move(res)); |
| | | stmt.fetch_all(row_handler); |
| | | res = PQgetResult(m_conn); |
| | | } |
| | | result_handler(e, affected); |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | result_handler(error(m_conn), 0); |
| | | } |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler) |
| | | { |
| | | std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this); |
| | | stmt->open([stmt, handler](const postgres::error& e) mutable { |
| | | handler(e, stmt); |
| | | }, query_text, 0); |
| | | } |
| | | |
| | | template<typename Handler> |
| | | bool is_alive(Handler&& handler) NOEXCEPT |
| | | { |
| | | simple_execute(std::forward<Handler>(handler), ""); |
| | | } |
| | | |
| | | socket_type socket() const NOEXCEPT { return PQsocket(m_conn); } |
| | | |
| | | int connect_timeout() const { return m_connect_timeout; } |
| | | void connect_timeout(int timeout) { m_connect_timeout = timeout; } |
| | | int query_timeout() const { return m_query_timeout; } |
| | | void query_timeout(int timeout) { m_query_timeout = timeout; } |
| | | |
| | | private: |
| | | int m_connect_timeout; |
| | | int m_query_timeout; |
| | | |
| | | void get_options() |
| | | { |
| | | PQconninfoOption* options = PQconninfo(m_conn); |
| | | m_connect_timeout = 2; |
| | | for (PQconninfoOption* option = options; option; option++) |
| | | { |
| | | if (strcmp(option->keyword, "connect_timeout") == 0) |
| | | { |
| | | if (option->val) |
| | | m_connect_timeout = atoi(option->val); |
| | | break; |
| | | } |
| | | } |
| | | PQconninfoFree(options); |
| | | } |
| | | |
| | | template<typename OpenHandler> |
| | | void wait_connect(OpenHandler&& handler) NOEXCEPT |
| | | { |
| | | PostgresPollingStatusType status = PQconnectPoll(m_conn); |
| | | switch (status) |
| | | { |
| | | case PGRES_POLLING_READING: |
| | | case PGRES_POLLING_WRITING: |
| | | m_event_handler->set_io_handler(event_flags(status), m_connect_timeout, |
| | | [this, handler](int flags) mutable { |
| | | if (flags&event::ef_timeout) |
| | | { |
| | | handler(postgres::timeout()); |
| | | } |
| | | else if(flags&(event::ef_read|event::ef_write | event::ef_exception)) |
| | | wait_connect(std::forward<OpenHandler>(handler)); |
| | | }); |
| | | break; |
| | | case PGRES_POLLING_FAILED: |
| | | handler(postgres::error(handle())); |
| | | break; |
| | | case PGRES_POLLING_OK: |
| | | //PQsetnonblocking(m_conn, true); |
| | | handler(postgres::error()); |
| | | } |
| | | } |
| | | |
| | | template<typename OpenHandler> |
| | | void wait_reset(OpenHandler&& handler) NOEXCEPT |
| | | { |
| | | PostgresPollingStatusType status = PQresetPoll(m_conn); |
| | | switch (status) |
| | | { |
| | | case PGRES_POLLING_READING: |
| | | case PGRES_POLLING_WRITING: |
| | | m_event_handler->set_io_handler(event_flags(status), m_connect_timeout, |
| | | [this, handler](int flags) mutable { |
| | | if (flags&event::ef_timeout) |
| | | { |
| | | handler(postgres::timeout()); |
| | | } |
| | | else if (flags&(event::ef_read | event::ef_write | event::ef_exception)) |
| | | wait_reset(std::forward<OpenHandler>(handler)); |
| | | }); |
| | | break; |
| | | case PGRES_POLLING_FAILED: |
| | | handler(postgres::error(m_conn)); |
| | | break; |
| | | case PGRES_POLLING_OK: |
| | | handler(postgres::error()); |
| | | } |
| | | } |
| | | |
| | | template<typename Handler> |
| | | void async_wait(Handler&& handler) |
| | | { |
| | | qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler)); |
| | | } |
| | | |
| | | }; |
| | | |
| | | inline async_statement::async_statement(async_connection& db) |
| | | : base_statement(static_cast<base_database&>(db)) |
| | | { |
| | | m_event = db.event(); |
| | | m_timeout = db.query_timeout(); |
| | | } |
| | | |
| | | |
| | | typedef qtl::transaction<database> transaction; |
| | | |
| | | template<typename Record> |