znone
2021-03-25 9b81cdebba95f1b6e687191c630c1f8f6cf0df16
include/qtl_postgres.hpp
@@ -13,6 +13,7 @@
#include <algorithm>
#include <assert.h>
#include "qtl_common.hpp"
#include "qtl_async.hpp"
#define FRONTEND
@@ -26,7 +27,6 @@
extern "C"
{
#include <c.h>
#include <postgres.h>
#include <catalog/pg_type.h>
}
@@ -164,11 +164,11 @@
class error : public std::exception
{
public:
   error() : m_errmsg(nullptr) { }
   error() : m_errmsg() { }
   explicit error(PGconn* conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS)
   {
      PQsetErrorVerbosity(conn, verbosity);
      PQsetErrorContextVisibility(conn, show_context);
      //PQsetErrorVerbosity(conn, verbosity);
      //PQsetErrorContextVisibility(conn, show_context);
      const char* errmsg = PQerrorMessage(conn);
      if (errmsg) m_errmsg = errmsg;
      else m_errmsg.clear();
@@ -182,9 +182,19 @@
   }
   virtual const char* what() const NOEXCEPT override { return m_errmsg.data(); }
   operator bool() const { return !m_errmsg.empty(); }
private:
protected:
   std::string m_errmsg;
};
class timeout : public error
{
public:
   timeout()
   {
      m_errmsg = "timeout";
   }
};
inline void verify_pgtypes_error(int ret)
@@ -992,7 +1002,7 @@
         if (end - data < size)
            throw std::overflow_error("insufficient data left in message");
         data = object_traits<T>::get(value, data, data + size);
         if (data >= end)
         if (data > end)
            throw std::overflow_error("insufficient data left in message");
         result.push_back(value);
      }
@@ -1494,6 +1504,17 @@
      }
   }
   template<ExecStatusType... Excepted>
   void verify_error(error& e)
   {
      if (m_res)
      {
         ExecStatusType got = status();
         if (!in<ExecStatusType, Excepted...>(got))
            e = error(m_res);
      }
   }
   void clear()
   {
      if (m_res)
@@ -1517,15 +1538,27 @@
    }
    base_statement(const base_statement&) = delete;
    base_statement(base_statement&& src) 
       : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res))
       : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name))
    {
    }
    base_statement& operator=(const base_statement&) = delete;
    base_statement& operator=(base_statement&& src)
    {
       if (this != &src)
       {
          close();
          m_conn = src.m_conn;
          m_binders = std::move(src.m_binders);
          m_res = std::move(src.m_res);
       }
       return *this;
    }
   result& get_result() { return m_res; }
   void close()
   {
      m_res=nullptr;
      m_res = nullptr;
   }
   uint64_t affetced_rows() const
@@ -1614,6 +1647,7 @@
protected:
   PGconn* m_conn;
   result m_res;
   std::string _name;
   std::vector<binder> m_binders;
   template<ExecStatusType... Excepted>
@@ -1624,6 +1658,13 @@
      else
         throw error(m_conn);
   }
   void finish(result& res)
   {
      while (res)
      {
         res = PQgetResult(m_conn);
      }
   }
};
class statement : public base_statement
@@ -1633,7 +1674,7 @@
   {
   }
   statement(const statement&) = delete;
   statement(statement&& src) : base_statement(std::move(src)), _name(std::move(src._name))
   statement(statement&& src) : base_statement(std::move(src))
   {
   }
@@ -1761,17 +1802,6 @@
      finish(m_res);
      m_res.clear();
   }
private:
   std::string _name;
   void finish(result& res)
   {
      while (res)
      {
         res = PQgetResult(m_conn);
      }
   }
};
class base_database
@@ -1783,6 +1813,8 @@
   }
public:
   typedef postgres::error exception_type;
   base_database(const base_database&) = delete;
   base_database(base_database&& src)
   {
@@ -1796,8 +1828,8 @@
         PQfinish(m_conn);
   }
   base_database& operator==(const base_database&) = delete;
   base_database& operator==(base_database&& src)
   base_database& operator=(const base_database&) = delete;
   base_database& operator=(base_database&& src)
   {
      if (this != &src)
      {
@@ -1882,12 +1914,10 @@
         PQreset(m_conn);
   }
   bool is_alive()
   void close()
   {
   }
   PGPing ping()
   {
      PQfinish(m_conn);
      m_conn = nullptr;
   }
protected:
@@ -1920,7 +1950,7 @@
                  m_res.get_column_type(j));
            }
            qtl::bind_record(*this, std::forward<decltype(values)>(values));
            proc(values);
            qtl::detail::apply(proc, std::forward<decltype(values)>(values));
         }
      }
   }
@@ -1959,12 +1989,6 @@
      sprintf(port_text, "%u", port);
      m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password);
      return m_conn != nullptr && status() == CONNECTION_OK;
   }
   void close()
   {
      PQfinish(m_conn);
      m_conn = nullptr;
   }
   statement open_command(const char* query_text, size_t /*text_length*/)
@@ -2023,8 +2047,669 @@
      simple_execute("COMMIT");
   }
   bool is_alive()
   {
      qtl::postgres::result res(PQexec(m_conn, ""));
      return res && res.status() == PGRES_COMMAND_OK;
   }
};
inline int event_flags(PostgresPollingStatusType status)
{
   int flags = 0;
   if (status == PGRES_POLLING_READING)
      flags |= event::ef_read;
   else if (status == PGRES_POLLING_WRITING)
      flags |= event::ef_write;
   else if (status == PGRES_POLLING_FAILED)
      flags |= event::ef_exception;
   return flags;
}
class async_connection;
template<typename Handler>
inline void async_wait(qtl::event* event, PGconn* conn, int timeout, Handler&& handler)
{
   int flushed = PQflush(conn);
   if (flushed < 0)
   {
      handler(error(conn));
      return;
   }
   if (flushed == 1)
   {
      event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout,
         [event, conn, timeout, handler](int flags) mutable {
         if (flags&qtl::event::ef_timeout)
         {
            handler(postgres::timeout());
            return;
         }
         if (flags&qtl::event::ef_read)
         {
            if (!PQconsumeInput(conn))
            {
               handler(error(conn));
               return;
            }
         }
         if (flags&(qtl::event::ef_read | qtl::event::ef_write | event::ef_exception))
            async_wait(event, conn, timeout, handler);
      });
   }
   else
   {
      event->set_io_handler(qtl::event::ef_read, 10,
         [event, conn, timeout, handler](int flags) mutable {
         if (flags&qtl::event::ef_timeout)
         {
            handler(postgres::timeout());
         }
         else if (flags&(qtl::event::ef_read | qtl::event::ef_exception))
         {
            if (PQconsumeInput(conn))
            {
               if (!PQisBusy(conn))
                  handler(postgres::error());
               else
                  async_wait(event, conn, timeout, handler);
            }
            else
            {
               handler(postgres::error(conn));
            }
         }
         else
         {
            handler(postgres::error(conn));
         }
      });
   }
}
class async_statement : public base_statement
{
public:
   async_statement(async_connection& db);
   async_statement(async_statement&& src)
      : base_statement(std::move(src)), m_timeout(2)
   {
      m_event = src.m_event;
      m_timeout = src.m_timeout;
      src.m_event = nullptr;
   }
   async_statement& operator=(async_statement&& src)
   {
      if (this != &src)
      {
         base_statement::operator =(std::move(src));
         m_event = src.m_event;
         m_timeout = src.m_timeout;
         src.m_event = nullptr;
      }
      return *this;
   }
   ~async_statement()
   {
      close();
   }
   /*
      Handler defiens as:
      void handler(const qtl::mysql::error& e);
    */
   template<typename Handler>
   void open(Handler&& handler, const char* command, int nParams = 0, const Oid *paramTypes = nullptr)
   {
      _name.resize(sizeof(intptr_t) * 2 + 1);
      int n = sprintf(const_cast<char*>(_name.data()), "q%p", this);
      _name.resize(n);
      std::transform(_name.begin(), _name.end(), _name.begin(), tolower);
      if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes))
      {
         async_wait([this, handler](error e) mutable {
            if (!e)
            {
               m_res = PQgetResult(m_conn);
               if (m_res)
               {
                  m_res.verify_error<PGRES_COMMAND_OK>(e);
                  while(m_res)
                     m_res = PQgetResult(m_conn);
               }
            }
            handler(e);
         });
      }
      else
      {
         _name.clear();
         handler(error(m_conn));
      }
   }
   template<typename Handler, typename... Types>
   void open(Handler&& handler, const char* command)
   {
      auto binder_list = make_binder_list(Types()...);
      std::array<Oid, sizeof...(Types)> types;
      std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder& b) {
         return b.type();
      });
      open(std::forward<Handler>(handler), command, types.size(), types.data());
   }
   void close()
   {
      while (m_res)
      {
         m_res = PQgetResult(m_conn);
      }
      if (!_name.empty())
      {
         std::ostringstream oss;
         oss << "DEALLOCATE " << _name << ";";
         result res = PQexec(m_conn, oss.str().data());
         error e;
         res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
         finish(res);
         if(e) throw e;
      }
      base_statement::close();
   }
   template<typename Handler>
   void close(Handler&& handler)
   {
      while (m_res)
      {
         if(PQisBusy(m_conn))
         {
            async_wait([this, handler](const error& e) mutable {
               close(handler);
            });
         }
         else
         {
            m_res = PQgetResult(m_conn);
         }
      }
      if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK)
      {
         std::ostringstream oss;
         oss << "DEALLOCATE " << _name << ";";
         bool ok = PQsendQuery(m_conn, oss.str().data());
         if (ok)
         {
            async_wait([this, handler](postgres::error e) mutable {
               if (PQstatus(m_conn) == CONNECTION_OK)
               {
                  result res(PQgetResult(m_conn));
                  if (res)
                     res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
                  if (!e) _name.clear();
                  finish(res);
                  handler(e);
               }
               else
               {
                  _name.clear();
                  handler(error());
               }
            });
         }
         else
         {
            handler(error(m_conn));
         }
      }
      else
      {
         _name.clear();
      }
   }
   /*
      ExecuteHandler defiens as:
      void handler(const qtl::mysql::error& e, uint64_t affected);
    */
   template<typename ExecuteHandler>
   void execute(ExecuteHandler&& handler)
   {
      if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) &&
         PQsetSingleRowMode(m_conn))
      {
         async_wait([this, handler](error e) {
            if (!e)
            {
               m_res = PQgetResult(m_conn);
               m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
               finish(m_res);
            }
            handler(e);
         });
      }
      else
      {
         handler(error(m_conn));
      }
   }
   template<typename Types, typename Handler>
   void execute(const Types& params, Handler&& handler)
   {
      const size_t count = qtl::params_binder<statement, Types>::size;
      if (count > 0)
      {
         m_binders.resize(count);
         qtl::bind_params(*this, params);
         std::array<const char*, count> values;
         std::array<int, count> lengths;
         std::array<int, count> formats;
         for (size_t i = 0; i != m_binders.size(); i++)
         {
            values[i] = m_binders[i].value();
            lengths[i] = static_cast<int>(m_binders[i].length());
            formats[i] = 1;
         }
         if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast<int>(m_binders.size()), values.data(), lengths.data(), formats.data(), 1))
         {
            handler(error(m_conn), 0);
            return;
         }
      }
      else
      {
         if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1))
         {
            handler(error(m_conn), 0);
            return;
         }
      }
      if (!PQsetSingleRowMode(m_conn))
      {
         handler(error(m_conn), 0);
         return;
      }
      if (PQisBusy(m_conn))
      {
         async_wait([this, handler](error e) mutable {
            if (!e)
            {
               m_res = PQgetResult(m_conn);
               m_res.verify_error<PGRES_COMMAND_OK, PGRES_SINGLE_TUPLE>(e);
               int64_t affected = m_res.affected_rows();
               finish(m_res);
               handler(e, affected);
            }
            else
            {
               handler(e, 0);
            }
         });
      }
   }
   template<typename Types, typename RowHandler, typename FinishHandler>
   void fetch(Types&& values, RowHandler&& row_handler, FinishHandler&& finish_handler)
   {
      if (m_res)
      {
         ExecStatusType status = m_res.status();
         if (status == PGRES_SINGLE_TUPLE)
         {
            int count = m_res.get_column_count();
            if (count > 0)
            {
               m_binders.resize(count);
               for (int i = 0; i != count; i++)
               {
                  m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i),
                     m_res.get_column_type(i));
               }
               qtl::bind_record(*this, std::forward<Types>(values));
            }
            row_handler();
            if (PQisBusy(m_conn))
            {
               async_wait([this, &values, row_handler, finish_handler](const error& e) {
                  if (e)
                  {
                     finish_handler(e);
                  }
                  else
                  {
                     m_res = PQgetResult(m_conn);
                     fetch(std::forward<Types>(values), row_handler, finish_handler);
                  }
               });
            }
            else
            {
               m_res = PQgetResult(m_conn);
               fetch(std::forward<Types>(values), row_handler, finish_handler);
            }
         }
         else
         {
            error e;
            m_res.verify_error<PGRES_TUPLES_OK>(e);
            finish_handler(e);
         }
      }
      else
      {
         finish_handler(error());
      }
   }
   template<typename Handler>
   void next_result(Handler&& handler)
   {
      async_wait([this, handler](const error& e) {
         if (e)
         {
            handler(e);
         }
         else
         {
            m_res = PQgetResult(m_conn);
            handler(error());
         }
      });
   }
private:
   event* m_event;
   int m_timeout;
   template<typename Handler>
   void async_wait(Handler&& handler)
   {
      qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward<Handler>(handler));
   }
};
class async_connection : public base_database, public qtl::async_connection<async_connection, async_statement>
{
public:
   async_connection() : m_connect_timeout(2), m_query_timeout(2)
   {
   }
   async_connection(async_connection&& src)
      : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout)
   {
   }
   async_connection& operator=(async_connection&& src)
   {
      if (this != &src)
      {
         base_database::operator=(std::move(src));
         m_connect_timeout = src.m_connect_timeout;
         m_query_timeout = src.m_query_timeout;
      }
      return *this;
   }
   /*
      OpenHandler defines as:
         void handler(const qtl::postgres::error& e) NOEXCEPT;
   */
   template<typename EventLoop, typename OpenHandler>
   void open(EventLoop& ev, OpenHandler&& handler, const std::map<std::string, std::string>& params, bool expand_dbname = false)
   {
      std::vector<const char*> keywords;
      std::vector<const char*> values;
      keywords.reserve(params.size());
      values.reserve(params.size());
      for (auto& param : params)
      {
         keywords.push_back(param.first.data());
         values.push_back(param.second.data());
      }
      keywords.push_back(nullptr);
      values.push_back(nullptr);
      m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname);
      if (m_conn == nullptr)
         throw std::bad_alloc();
      if (status() == CONNECTION_BAD)
      {
         handler(error(m_conn));
         return;
      }
      if (PQsetnonblocking(m_conn, true)!=0)
         handler(error(m_conn));
      get_options();
      bind(ev);
      wait_connect(std::forward<OpenHandler>(handler));
   }
   template<typename EventLoop, typename OpenHandler>
   void open(EventLoop& ev, OpenHandler&& handler, const char * conninfo)
   {
      m_conn = PQconnectStart(conninfo);
      if (m_conn == nullptr)
         throw std::bad_alloc();
      if (status() == CONNECTION_BAD)
      {
         handler(error(m_conn));
         return;
      }
      PQsetnonblocking(m_conn, true);
      get_options();
      bind(ev);
      wait_connect(std::forward<OpenHandler>(handler));
   }
   template<typename OpenHandler>
   void reset(OpenHandler&& handler)
   {
      PQresetStart(m_conn);
      wait_reset(std::forward<OpenHandler>(handler));
   }
   /*
      Handler defines as:
         void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT;
   */
   template<typename ExecuteHandler>
   void simple_execute(ExecuteHandler&& handler, const char* query_text) NOEXCEPT
   {
      bool ok = PQsendQuery(m_conn, query_text);
      if (ok)
      {
         async_wait([this, handler](postgres::error e) mutable {
            result res(PQgetResult(m_conn));
            res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
            uint64_t affected = res.affected_rows();
            handler(e, affected);
            while (res)
               res = PQgetResult(m_conn);
         });
      }
      else
      {
         handler(error(m_conn), 0);
      }
   }
   template<typename Handler>
   void auto_commit(Handler&& handler, bool on) NOEXCEPT
   {
      simple_execute(std::forward<Handler>(handler),
         on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF");
   }
   template<typename Handler>
   void begin_transaction(Handler&& handler) NOEXCEPT
   {
      simple_execute(std::forward<Handler>(handler), "BEGIN");
   }
   template<typename Handler>
   void rollback(Handler&& handler) NOEXCEPT
   {
      simple_execute(std::forward<Handler>(handler), "ROLLBACK");
   }
   template<typename Handler>
   void commit(Handler&& handler) NOEXCEPT
   {
      simple_execute(std::forward<Handler>(handler), "COMMIT");
   }
   /*
      ResultHandler defines as:
         void result_handler(const qtl::postgres::error& e) NOEXCEPT;
   */
   template<typename RowHandler, typename ResultHandler>
   void simple_query(const char* query, RowHandler&& row_handler, ResultHandler&& result_handler) NOEXCEPT
   {
      bool ok = PQsendQuery(m_conn, query);
      if (ok)
      {
         async_wait([this, row_handler, result_handler](postgres::error e) mutable {
            result res(PQgetResult(m_conn));
            res.verify_error<PGRES_COMMAND_OK, PGRES_TUPLES_OK>(e);
            if (e)
            {
               result_handler(e, 0);
               return;
            }
            uint64_t affected = res.affected_rows();
            while (res && res.status() == PGRES_TUPLES_OK)
            {
               simple_statment stmt(*this, std::move(res));
               stmt.fetch_all(row_handler);
               res = PQgetResult(m_conn);
            }
            result_handler(e, affected);
         });
      }
      else
      {
         result_handler(error(m_conn), 0);
      }
   }
   template<typename Handler>
   void open_command(const char* query_text, size_t /*text_length*/, Handler&& handler)
   {
      std::shared_ptr<async_statement> stmt = std::make_shared<async_statement>(*this);
      stmt->open([stmt, handler](const postgres::error& e) mutable {
         handler(e, stmt);
      }, query_text, 0);
   }
   template<typename Handler>
   bool is_alive(Handler&& handler) NOEXCEPT
   {
      simple_execute(std::forward<Handler>(handler), "");
   }
   socket_type socket() const NOEXCEPT { return PQsocket(m_conn); }
   int connect_timeout() const { return m_connect_timeout; }
   void connect_timeout(int timeout) { m_connect_timeout = timeout; }
   int query_timeout() const { return m_query_timeout; }
   void query_timeout(int timeout) { m_query_timeout = timeout; }
private:
   int m_connect_timeout;
   int m_query_timeout;
   void get_options()
   {
      PQconninfoOption* options = PQconninfo(m_conn);
      m_connect_timeout = 2;
      for (PQconninfoOption* option = options; option; option++)
      {
         if (strcmp(option->keyword, "connect_timeout") == 0)
         {
            if (option->val)
               m_connect_timeout = atoi(option->val);
            break;
         }
      }
      PQconninfoFree(options);
   }
   template<typename OpenHandler>
   void wait_connect(OpenHandler&& handler) NOEXCEPT
   {
      PostgresPollingStatusType status = PQconnectPoll(m_conn);
      switch (status)
      {
      case PGRES_POLLING_READING:
      case PGRES_POLLING_WRITING:
         m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
            [this, handler](int flags) mutable {
            if (flags&event::ef_timeout)
            {
               handler(postgres::timeout());
            }
            else if(flags&(event::ef_read|event::ef_write | event::ef_exception))
               wait_connect(std::forward<OpenHandler>(handler));
         });
         break;
      case PGRES_POLLING_FAILED:
         handler(postgres::error(handle()));
         break;
      case PGRES_POLLING_OK:
         //PQsetnonblocking(m_conn, true);
         handler(postgres::error());
      }
   }
   template<typename OpenHandler>
   void wait_reset(OpenHandler&& handler) NOEXCEPT
   {
      PostgresPollingStatusType status = PQresetPoll(m_conn);
      switch (status)
      {
      case PGRES_POLLING_READING:
      case PGRES_POLLING_WRITING:
         m_event_handler->set_io_handler(event_flags(status), m_connect_timeout,
            [this, handler](int flags) mutable {
            if (flags&event::ef_timeout)
            {
               handler(postgres::timeout());
            }
            else if (flags&(event::ef_read | event::ef_write | event::ef_exception))
               wait_reset(std::forward<OpenHandler>(handler));
         });
         break;
      case PGRES_POLLING_FAILED:
         handler(postgres::error(m_conn));
         break;
      case PGRES_POLLING_OK:
         handler(postgres::error());
      }
   }
   template<typename Handler>
   void async_wait(Handler&& handler)
   {
      qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward<Handler>(handler));
   }
};
inline async_statement::async_statement(async_connection& db)
   : base_statement(static_cast<base_database&>(db))
{
   m_event = db.event();
   m_timeout = db.query_timeout();
}
typedef qtl::transaction<database> transaction;
template<typename Record>