#ifndef _SQL_POSTGRES_H_ #define _SQL_POSTGRES_H_ #pragma once #include #include #include #include #include #include #include #include #include #include "qtl_common.hpp" #include "qtl_async.hpp" #define FRONTEND #include #include #include #include #include #include #include extern "C" { #include #include } #ifdef open #undef open #endif // open #ifdef vsnprintf #undef vsnprintf #endif #ifdef snprintf #undef snprintf #endif #ifdef sprintf #undef sprintf #endif #ifdef vfprintf #undef vfprintf #endif #ifdef fprintf #undef fprintf #endif #ifdef printf #undef printf #endif #ifdef rename #undef rename #endif #ifdef unlink #undef unlink #endif #if defined(_WIN32) && _WIN32_WINNT < 0x0601 #ifdef _M_IX86 #define _WS2_32_WINSOCK_SWAP_LONGLONG(l) \ ((((l) >> 56) & 0x00000000000000FFLL) | \ (((l) >> 40) & 0x000000000000FF00LL) | \ (((l) >> 24) & 0x0000000000FF0000LL) | \ (((l) >> 8) & 0x00000000FF000000LL) | \ (((l) << 8) & 0x000000FF00000000LL) | \ (((l) << 24) & 0x0000FF0000000000LL) | \ (((l) << 40) & 0x00FF000000000000LL) | \ (((l) << 56) & 0xFF00000000000000LL)) #ifndef htonll __inline unsigned __int64 htonll(unsigned __int64 Value) { const unsigned __int64 Retval = _WS2_32_WINSOCK_SWAP_LONGLONG(Value); return Retval; } #endif /* htonll */ #ifndef ntohll __inline unsigned __int64 ntohll(unsigned __int64 Value) { const unsigned __int64 Retval = _WS2_32_WINSOCK_SWAP_LONGLONG(Value); return Retval; } #endif /* ntohll */ #endif #endif namespace qtl { namespace postgres { namespace detail { inline int16_t ntoh(int16_t v) { return static_cast(ntohs(v)); } inline uint16_t ntoh(uint16_t v) { return ntohs(v); } inline int32_t ntoh(int32_t v) { return static_cast(ntohl(v)); } inline uint32_t ntoh(uint32_t v) { return ntohl(v); } inline uint64_t ntoh(uint64_t v) { #ifdef _WIN32 return ntohll(v); #else return be64toh(v); #endif } inline int64_t ntoh(int64_t v) { return ntoh(static_cast(v)); } template ::value && !std::is_const::value>::type> inline T &ntoh_inplace(T &v) { v = ntoh(v); return v; } inline int16_t hton(int16_t v) { return static_cast(htons(v)); } inline uint16_t hton(uint16_t v) { return htons(v); } inline int32_t hton(int32_t v) { return static_cast(htonl(v)); } inline uint32_t hton(uint32_t v) { return htonl(v); } inline uint64_t hton(uint64_t v) { #ifdef _WIN32 return htonll(v); #else return htobe64(v); #endif } inline int64_t hton(int64_t v) { return hton(static_cast(v)); } template ::value && !std::is_const::value>::type> inline T &hton_inplace(T &v) { v = hton(v); return v; } template ::value && !std::is_const::value>::type> std::pair::iterator, size_t> push(std::vector &buffer, T v) { v = hton_inplace(v); char *data = reinterpret_cast(&v); auto it = buffer.insert(buffer.end(), data, data + sizeof(T)); return std::make_pair(it, sizeof(T)); } template ::value && !std::is_const::value>::type> const char *pop(const char *data, T &v) { v = ntoh(*reinterpret_cast(data)); return data + sizeof(T); } } class base_database; class result; class error : public std::exception { public: error() : m_errmsg() {} explicit error(PGconn *conn, PGVerbosity verbosity = PQERRORS_DEFAULT, PGContextVisibility show_context = PQSHOW_CONTEXT_ERRORS) { // PQsetErrorVerbosity(conn, verbosity); // PQsetErrorContextVisibility(conn, show_context); const char *errmsg = PQerrorMessage(conn); if (errmsg) m_errmsg = errmsg; else m_errmsg.clear(); } explicit error(PGresult *res) { const char *errmsg = PQresultErrorMessage(res); if (errmsg) m_errmsg = errmsg; else m_errmsg.clear(); } explicit error(const char *errmsg) : m_errmsg(errmsg) {} virtual const char *what() const NOEXCEPT override { return m_errmsg.data(); } operator bool() const { return !m_errmsg.empty(); } protected: std::string m_errmsg; }; class timeout : public error { public: timeout() { m_errmsg = "timeout"; } }; inline void verify_pgtypes_error(int ret) { if (ret && errno != 0) throw std::system_error(std::error_code(errno, std::generic_category())); } struct interval { ::interval *value; interval() { value = PGTYPESinterval_new(); } explicit interval(char *str) { value = PGTYPESinterval_from_asc(str, nullptr); } interval(const interval &src) : interval() { verify_pgtypes_error(PGTYPESinterval_copy(src.value, value)); } interval(interval &&src) { value = src.value; src.value = PGTYPESinterval_new(); } ~interval() { PGTYPESinterval_free(value); } std::string to_string() const { return PGTYPESinterval_to_asc(value); } interval &operator=(const interval &src) { if (&src != this) verify_pgtypes_error(PGTYPESinterval_copy(src.value, value)); return *this; } }; struct timestamp { ::timestamp value; timestamp() = default; static timestamp now() { timestamp result; PGTYPEStimestamp_current(&result.value); return result; } explicit timestamp(char *str) { value = PGTYPEStimestamp_from_asc(str, nullptr); verify_pgtypes_error(1); } int format(char *str, int n, const char *format) const { timestamp temp = *this; return PGTYPEStimestamp_fmt_asc(&temp.value, str, n, format); } static timestamp parse(char *str, const char *format) { timestamp result; verify_pgtypes_error(PGTYPEStimestamp_defmt_asc(str, format, &result.value)); return result; } std::string to_string() const { char *str = PGTYPEStimestamp_to_asc(value); std::string result = str; PGTYPESchar_free(str); return result; } timestamp &operator+=(const interval &span) { verify_pgtypes_error(PGTYPEStimestamp_add_interval(&value, span.value, &value)); return *this; } timestamp &operator-=(const interval &span) { verify_pgtypes_error(PGTYPEStimestamp_sub_interval(&value, span.value, &value)); return *this; } }; inline timestamp operator+(const timestamp &a, const interval &b) { timestamp result = a; return result += b; } inline timestamp operator-(const timestamp &a, const interval &b) { timestamp result = a; result -= b; return result; } struct timestamptz { ::TimestampTz value; /* timestamptz() = default; explicit timestamptz(pg_time_t v) { value = (TimestampTz)v - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); value *= USECS_PER_SEC; } static timestamptz now() { timestamptz result; auto tp = std::chrono::system_clock::now(); int sec = tp.time_since_epoch().count()*std::nano::num/std::nano::den; int usec = tp.time_since_epoch().count()*std::nano::num % std::nano::den; result.value = (TimestampTz)sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); result.value = (result.value * USECS_PER_SEC) + usec; return result; } */ }; struct date { ::date value; date() = default; explicit date(timestamp dt) { value = PGTYPESdate_from_timestamp(dt.value); } explicit date(char *str) { value = PGTYPESdate_from_asc(str, nullptr); verify_pgtypes_error(1); } explicit date(int year, int month, int day) { int mdy[3] = {month, day, year}; PGTYPESdate_mdyjul(mdy, &value); } std::string to_string() const { char *str = PGTYPESdate_to_asc(value); std::string result = str; PGTYPESchar_free(str); return str; } static date now() { date result; PGTYPESdate_today(&result.value); return result; } static date parse(char *str, const char *format) { date result; verify_pgtypes_error(PGTYPESdate_defmt_asc(&result.value, format, str)); return result; } std::string format(const char *format) { std::string result; result.resize(128); verify_pgtypes_error(PGTYPESdate_fmt_asc(value, format, const_cast(result.data()))); result.resize(strlen(result.data())); return result; } std::tuple get_date() { int mdy[3]; PGTYPESdate_julmdy(value, mdy); return std::make_tuple(mdy[2], mdy[0], mdy[1]); } int dayofweek() { return PGTYPESdate_dayofweek(value); } }; struct decimal { ::decimal value; }; struct numeric { ::numeric *value; numeric() { value = PGTYPESnumeric_new(); } numeric(int v) : numeric() { verify_pgtypes_error(PGTYPESnumeric_from_int(v, value)); } numeric(long v) : numeric() { verify_pgtypes_error(PGTYPESnumeric_from_long(v, value)); } numeric(double v) : numeric() { verify_pgtypes_error(PGTYPESnumeric_from_double(v, value)); } numeric(const decimal &v) : numeric() { verify_pgtypes_error(PGTYPESnumeric_from_decimal(const_cast<::decimal *>(&v.value), value)); } numeric(const numeric &src) : numeric() { verify_pgtypes_error(PGTYPESnumeric_copy(src.value, value)); } explicit numeric(const char *str) { value = PGTYPESnumeric_from_asc(const_cast(str), nullptr); } ~numeric() { PGTYPESnumeric_free(value); } operator double() const { double result; verify_pgtypes_error(PGTYPESnumeric_to_double(value, &result)); return result; } operator int() const { int result; verify_pgtypes_error(PGTYPESnumeric_to_int(value, &result)); return result; } operator long() const { long result; verify_pgtypes_error(PGTYPESnumeric_to_long(value, &result)); return result; } operator decimal() const { decimal result; verify_pgtypes_error(PGTYPESnumeric_to_decimal(value, &result.value)); return result; } int compare(const numeric &other) const { return PGTYPESnumeric_cmp(value, other.value); } inline numeric &operator+=(const numeric &b) { verify_pgtypes_error(PGTYPESnumeric_add(value, b.value, value)); return *this; } inline numeric &operator-=(const numeric &b) { verify_pgtypes_error(PGTYPESnumeric_sub(value, b.value, value)); return *this; } inline numeric &operator*=(const numeric &b) { verify_pgtypes_error(PGTYPESnumeric_mul(value, b.value, value)); return *this; } inline numeric &operator/=(const numeric &b) { verify_pgtypes_error(PGTYPESnumeric_div(value, b.value, value)); return *this; } std::string to_string(int dscale = -1) const { char *str = PGTYPESnumeric_to_asc(value, dscale); std::string result = str; PGTYPESchar_free(str); return result; } }; inline numeric operator+(const numeric &a, const numeric &b) { numeric result; verify_pgtypes_error(PGTYPESnumeric_add(a.value, b.value, result.value)); return result; } inline numeric operator-(const numeric &a, const numeric &b) { numeric result; verify_pgtypes_error(PGTYPESnumeric_sub(a.value, b.value, result.value)); return result; } inline numeric operator*(const numeric &a, const numeric &b) { numeric result; verify_pgtypes_error(PGTYPESnumeric_mul(a.value, b.value, result.value)); return result; } inline numeric operator/(const numeric &a, const numeric &b) { numeric result; verify_pgtypes_error(PGTYPESnumeric_div(a.value, b.value, result.value)); return result; } inline bool operator==(const numeric &a, const numeric &b) { return a.compare(b) == 0; } inline bool operator<(const numeric &a, const numeric &b) { return a.compare(b) < 0; } inline bool operator>(const numeric &a, const numeric &b) { return a.compare(b) > 0; } inline bool operator<=(const numeric &a, const numeric &b) { return a.compare(b) <= 0; } inline bool operator>=(const numeric &a, const numeric &b) { return a.compare(b) >= 0; } inline bool operator!=(const numeric &a, const numeric &b) { return a.compare(b) != 0; } class large_object : public qtl::blobbuf { public: large_object() : m_conn(nullptr), m_id(InvalidOid), m_fd(-1) {} large_object(PGconn *conn, Oid loid, std::ios_base::openmode mode) { open(conn, loid, mode); } large_object(const large_object &) = delete; large_object(large_object &&src) { swap(src); src.m_conn = nullptr; src.m_fd = -1; } ~large_object() { close(); } static large_object create(PGconn *conn, Oid loid = InvalidOid) { Oid oid = lo_create(conn, loid); if (oid == InvalidOid) throw error(conn); return large_object(conn, oid, std::ios::in | std::ios::out | std::ios::binary); } static large_object load(PGconn *conn, const char *filename, Oid loid = InvalidOid) { Oid oid = lo_import_with_oid(conn, filename, loid); if (oid == InvalidOid) throw error(conn); return large_object(conn, oid, std::ios::in | std::ios::out | std::ios::binary); } void save(const char *filename) const { if (lo_export(m_conn, m_id, filename) < 0) throw error(m_conn); } void unlink() { close(); if (lo_unlink(m_conn, m_id) < 0) throw error(m_conn); } large_object &operator=(const large_object &) = delete; large_object &operator=(large_object &&src) { if (this != &src) { swap(src); src.close(); } return *this; } bool is_open() const { return m_fd >= 0; } Oid oid() const { return m_id; } void open(PGconn *conn, Oid loid, std::ios_base::openmode mode) { int lomode = 0; if (mode & std::ios_base::in) lomode |= INV_READ; if (mode & std::ios_base::out) lomode |= INV_WRITE; m_conn = conn; m_id = loid; m_fd = lo_open(m_conn, loid, lomode); if (m_fd < 0) throw error(m_conn); m_size = size(); init_buffer(mode); if (mode & std::ios_base::trunc) { if (lo_truncate(m_conn, m_fd, 0) < 0) throw error(m_conn); } } void close() { if (m_fd >= 0) { overflow(); if (lo_close(m_conn, m_fd) < 0) throw error(m_conn); m_fd = -1; } } void flush() { if (m_fd >= 0) overflow(); } size_t size() const { pg_int64 size = 0; if (m_fd >= 0) { pg_int64 org = lo_tell64(m_conn, m_fd); size = lo_lseek64(m_conn, m_fd, 0, SEEK_END); lo_lseek64(m_conn, m_fd, org, SEEK_SET); } return size; } void resize(size_t n) { if (m_fd >= 0 && lo_truncate64(m_conn, m_fd, n) < 0) throw error(m_conn); } void swap(large_object &other) { std::swap(m_conn, other.m_conn); std::swap(m_id, other.m_id); std::swap(m_fd, other.m_fd); qtl::blobbuf::swap(other); } protected: enum { default_buffer_size = 4096 }; virtual bool read_blob(char *buffer, off_type &count, pos_type position) override { return lo_lseek64(m_conn, m_fd, position, SEEK_SET) >= 0 && lo_read(m_conn, m_fd, buffer, count) > 0; } virtual void write_blob(const char *buffer, size_t count) override { if (lo_write(m_conn, m_fd, buffer, count) < 0) throw error(m_conn); } private: PGconn *m_conn; Oid m_id; int m_fd; }; struct array_header { int32_t ndim; int32_t flags; int32_t elemtype; struct dimension { int32_t length; int32_t lower_bound; } dims[1]; }; /* template struct oid_traits { typedef T value_type; static Oid type_id; static Oid array_type_id; //optional static const char* get(value_type& result, const char* begin, const char* end); static std::pair data(const T& v, std::vector& buffer); }; */ template struct base_object_traits { typedef T value_type; enum { type_id = id }; static bool is_match(Oid v) { return v == type_id; } }; template struct object_traits; #define QTL_POSTGRES_SIMPLE_TRAITS(T, oid, array_oid) \ template <> \ struct object_traits : public base_object_traits \ { \ enum \ { \ array_type_id = array_oid \ }; \ static const char *get(value_type &result, const char *data, const char *end) \ { \ result = *reinterpret_cast(data); \ return data + sizeof(value_type); \ } \ static std::pair data(const T &v, std::vector & /*data*/) \ { \ return std::make_pair(reinterpret_cast(&v), sizeof(T)); \ } \ }; QTL_POSTGRES_SIMPLE_TRAITS(bool, BOOLOID, 1000) QTL_POSTGRES_SIMPLE_TRAITS(char, CHAROID, 1002) QTL_POSTGRES_SIMPLE_TRAITS(float, FLOAT4OID, FLOAT4ARRAYOID) QTL_POSTGRES_SIMPLE_TRAITS(double, FLOAT8OID, 1022) template struct integral_traits : public base_object_traits { enum { array_type_id = array_id }; typedef typename base_object_traits::value_type value_type; static const char *get(value_type &v, const char *data, const char *end) { return detail::pop(data, v); } static std::pair data(value_type v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, v); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template <> struct object_traits : public integral_traits { }; template <> struct object_traits : public integral_traits { }; template <> struct object_traits : public integral_traits { }; template <> struct object_traits : public integral_traits { }; template struct text_traits : public base_object_traits { enum { array_type_id = TEXTARRAYOID }; }; template <> struct object_traits : public text_traits { static bool is_match(Oid v) { return v == TEXTOID || v == VARCHAROID || v == BPCHAROID; } static const char *get(const char *&result, const char *data, const char *end) { result = data; return end; } static std::pair data(const char *v, std::vector & /*buffer*/) { return std::make_pair(v, strlen(v)); } }; template <> struct object_traits : public object_traits { }; template <> struct object_traits : public text_traits { static bool is_match(Oid v) { return v == TEXTOID || v == VARCHAROID || v == BPCHAROID; } static const char *get(value_type &result, const char *data, const char *end) { result.assign(data, end); return end; } static std::pair data(const std::string &v, std::vector & /*buffer*/) { return std::make_pair(v.data(), v.size()); } }; template <> struct object_traits : public base_object_traits { enum { array_type_id = TIMESTAMPOID + 1 }; static const char *get(value_type &result, const char *data, const char *end) { result = *reinterpret_cast(data); result.value = detail::ntoh(result.value); return data + sizeof(timestamp); } static std::pair data(const timestamp &v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, v.value); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template <> struct object_traits : public base_object_traits { enum { array_type_id = TIMESTAMPTZOID + 1 }; static const char *get(value_type &result, const char *data, const char *end) { result = *reinterpret_cast(data); result.value = detail::ntoh(result.value); return data + sizeof(timestamptz); } static std::pair data(const timestamptz &v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, v.value); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template <> struct object_traits : public base_object_traits { enum { array_type_id = INTERVALOID + 1 }; static const char *get(value_type &result, const char *data, const char *end) { const ::interval *value = reinterpret_cast(data); result.value->time = detail::ntoh(value->time); result.value->month = detail::ntoh(value->month); return data + sizeof(interval); } static std::pair data(const interval &v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, v.value->time); detail::push(buffer, v.value->month); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template <> struct object_traits : public base_object_traits { enum { array_type_id = 1182 }; static const char *get(value_type &result, const char *data, const char *end) { result = *reinterpret_cast(data); result.value = detail::ntoh(result.value); return data + sizeof(date); } static std::pair data(const date &v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, v.value); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template struct bytea_traits : public base_object_traits { enum { array_type_id = 1001 }; }; template <> struct object_traits : public bytea_traits { static const char *get(value_type &result, const char *data, const char *end) { result.data = data; result.size = end - data; return end; } static std::pair data(const qtl::const_blob_data &v, std::vector & /*buffer*/) { assert(v.size <= UINT32_MAX); return std::make_pair(static_cast(v.data), v.size); } }; template <> struct object_traits : public bytea_traits { static const char *get(qtl::blob_data &value, const char *data, const char *end) { if (value.size < end - data) throw std::out_of_range("no enough buffer to receive blob data."); memcpy(value.data, data, end - data); return end; } static std::pair data(const qtl::blob_data &v, std::vector & /*buffer*/) { assert(v.size <= UINT32_MAX); return std::make_pair(static_cast(v.data), v.size); } }; template <> struct object_traits> : public bytea_traits> { static const char *get(value_type &result, const char *data, const char *end) { result.assign(data, end); return end; } static std::pair data(const std::vector &v, std::vector & /*buffer*/) { assert(v.size() <= UINT32_MAX); return std::make_pair(reinterpret_cast(v.data()), v.size()); } }; template <> struct object_traits : public base_object_traits { enum { array_type_id = OIDARRAYOID }; static value_type get(PGconn *conn, const char *data, const char *end) { int32_t oid; object_traits::get(oid, data, end); return large_object(conn, oid, std::ios::in | std::ios::out | std::ios::binary); } static std::pair data(const large_object &v, std::vector &buffer) { return object_traits::data(v.oid(), buffer); } }; template struct vector_traits : public base_object_traits, id> { typedef typename base_object_traits, id>::value_type value_type; static const char *get(value_type &result, const char *data, const char *end) { if (end - data < sizeof(array_header)) throw std::overflow_error("insufficient data left in message"); array_header header = *reinterpret_cast(data); detail::ntoh_inplace(header.ndim); detail::ntoh_inplace(header.flags); detail::ntoh_inplace(header.elemtype); detail::ntoh_inplace(header.dims[0].length); detail::ntoh_inplace(header.dims[0].lower_bound); if (header.ndim != 1 || !object_traits::is_match(header.elemtype)) throw std::bad_cast(); data += sizeof(array_header); result.reserve(header.dims[0].length); for (int32_t i = 0; i != header.dims[0].length; i++) { int32_t size; T value; data = detail::pop(data, size); if (end - data < size) throw std::overflow_error("insufficient data left in message"); data = object_traits::get(value, data, data + size); if (data > end) throw std::overflow_error("insufficient data left in message"); result.push_back(value); } return data; } static std::pair data(const std::vector &v, std::vector &buffer) { assert(v.size() <= INT32_MAX); size_t n = buffer.size(); buffer.resize(n + sizeof(array_header)); array_header *header = reinterpret_cast(buffer.data() + n); header->ndim = detail::hton(1); header->flags = detail::hton(0); header->elemtype = detail::hton(static_cast(object_traits::type_id)); header->dims[0].length = detail::hton(static_cast(v.size())); header->dims[0].lower_bound = detail::hton(1); std::vector temp; for (const T &e : v) { std::pair blob = object_traits::data(e, temp); detail::push(buffer, static_cast(blob.second)); buffer.insert(buffer.end(), blob.first, blob.first + blob.second); } return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template struct iterator_traits : public base_object_traits { static const char *get(Iterator first, Iterator last, const char *data, const char *end) { if (end - data < sizeof(array_header)) throw std::overflow_error("insufficient data left in message"); array_header header = *reinterpret_cast(data); detail::ntoh_inplace(header.ndim); detail::ntoh_inplace(header.flags); detail::ntoh_inplace(header.elemtype); detail::ntoh_inplace(header.dims[0].length); detail::ntoh_inplace(header.dims[0].lower_bound); if (header.ndim != 1 || !object_traits::value_type>::is_match(header.elemtype)) throw std::bad_cast(); data += sizeof(array_header); if (std::distance(first, last) < header.dims[0].length) throw std::out_of_range("length of array out of range"); Iterator it = first; for (int32_t i = 0; i != header.dims[0].length; i++, it++) { int32_t size; data = detail::pop(data, size); if (end - data < size) throw std::overflow_error("insufficient data left in message"); data = object_traits::value_type>::get(*it, data, data + size); if (data >= end) throw std::overflow_error("insufficient data left in message"); } return data; } static std::pair data(Iterator first, Iterator last, std::vector &buffer) { assert(std::distance(first, last) <= INT32_MAX); size_t n = buffer.size(); buffer.resize(n + sizeof(array_header)); array_header *header = reinterpret_cast(buffer.data() + n); header->ndim = detail::hton(1); header->flags = detail::hton(0); header->elemtype = detail::hton(static_cast(object_traits::value_type>::type_id)); header->dims[0].length = detail::hton(static_cast(std::distance(first, last))); header->dims[0].lower_bound = detail::hton(1); std::vector temp; for (Iterator it = first; it != last; it++) { std::pair blob = object_traits::value_type>::data(*it, temp); detail::push(buffer, static_cast(blob.second)); buffer.insert(buffer.end(), blob.first, blob.first + blob.second); } return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template struct range_traits : public base_object_traits, id> { static const char *get(std::pair &result, const char *data, const char *end) { return iterator_traits::get(result.first, result.second, data, end); } static std::pair data(const std::pair &v, std::vector &buffer) { return iterator_traits::data(v.first, v.second, buffer); } }; template struct object_traits> : public vector_traits::array_type_id> { }; template struct object_traits::value_type>::value, Iterator>::type, Iterator>> : public range_traits::value_type>::array_type_id> { }; template struct carray_traits : public base_object_traits { static const char *get(T (&result)[N], const char *data, const char *end) { return iterator_traits::get(std::begin(result), std::end(result), data, end); } static std::pair data(const T (&v)[N], std::vector &buffer) { return iterator_traits::data(std::begin(v), std::end(v), buffer); } }; template struct array_traits : public base_object_traits, id> { static const char *get(std::array &result, const char *data, const char *end) { return iterator_traits::get(std::begin(result), std::end(result), data, end); } static std::pair data(const std::array &v, std::vector &buffer) { return iterator_traits::data(std::begin(v), std::end(v), buffer); } }; template struct object_traits : public carray_traits::array_type_id> { }; template struct object_traits> : public array_traits::array_type_id> { }; namespace detail { struct field_header { Oid type; int32_t length; }; template static const char *get_field(Type &field, const char *data, const char *end) { field_header header = *reinterpret_cast(data); detail::ntoh_inplace(header.type); detail::ntoh_inplace(header.length); data += sizeof(field_header); if (end - data < header.length) throw std::overflow_error("insufficient data left in message"); return object_traits::get(field, data, data + header.length); } template struct get_field_helper { const char *operator()(Tuple &result, const char *data, const char *end) { if (end - data < sizeof(field_header)) throw std::overflow_error("insufficient data left in message"); auto &field = std::get::value - N>(result); data = get_field(field, data, end); get_field_helper()(result, data, end); return data; } }; template struct get_field_helper { const char *operator()(Tuple &result, const char *data, const char *end) { if (end - data < sizeof(field_header)) throw std::overflow_error("insufficient data left in message"); auto &field = std::get::value - 1>(result); return get_field(field, data, end); } }; template static void push_field(const Type &field, std::vector &buffer) { std::vector temp; detail::push(buffer, static_cast(object_traits::type_id)); auto result = object_traits::data(field, temp); detail::push(buffer, static_cast(result.second)); buffer.insert(buffer.end(), result.first, result.first + result.second); } template struct push_field_helper { void operator()(const Tuple &data, std::vector &buffer) { const auto &field = std::get::value - N>(data); push_field(field, buffer); push_field_helper()(data, buffer); } }; template struct push_field_helper { void operator()(const Tuple &data, std::vector &buffer) { const auto &field = std::get::value - 1>(data); push_field(field, buffer); } }; template static const char *get_fields(Tuple &result, const char *data, const char *end) { return get_field_helper::value>()(result, data, end); } template static void push_fields(const Tuple &data, std::vector &buffer) { push_field_helper::value>()(data, buffer); } } template struct tuple_traits : public base_object_traits { typedef typename base_object_traits::value_type value_type; static const char *get(value_type &result, const char *data, const char *end) { int32_t count; data = detail::pop(data, count); if (data >= end) throw std::overflow_error("insufficient data left in message"); if (std::tuple_size::value != count) throw std::bad_cast(); return detail::get_fields(result, data, end); } static std::pair data(const value_type &v, std::vector &buffer) { size_t n = buffer.size(); detail::push(buffer, static_cast(std::tuple_size::value)); detail::push_fields(v, buffer); return std::make_pair(buffer.data() + n, buffer.size() - n); } }; template struct object_traits> : public tuple_traits, InvalidOid> { }; template struct object_traits> : public tuple_traits, InvalidOid> { }; struct binder { binder() = default; template explicit binder(const T &v) { m_type = object_traits::value(); auto pair = object_traits::data(v); m_value = pair.first; m_length = pair.second; } binder(const char *data, size_t n, Oid oid) { m_type = oid; m_value = data; m_length = n; } Oid constexpr type() const { return m_type; } size_t length() const { return m_length; } const char *value() const { return m_value; } template T get() { if (!object_traits::is_match(m_type)) throw std::bad_cast(); T v; object_traits::get(v, m_value, m_value + m_length); return v; } template T get(PGconn *conn) { if (!object_traits::is_match(m_type)) throw std::bad_cast(); return object_traits::get(conn, m_value, m_value + m_length); } template void get(T &v) { if (object_traits::type_id != InvalidOid && !object_traits::is_match(m_type)) throw std::bad_cast(); object_traits::get(v, m_value, m_value + m_length); } void bind(std::nullptr_t) { m_value = nullptr; m_length = 0; } void bind(qtl::null) { bind(nullptr); } template ::value>::type> void bind(const T &v) { typedef typename std::decay::type param_type; if (m_type != 0 && !object_traits::is_match(m_type)) throw std::bad_cast(); auto pair = object_traits::data(v, m_data); m_value = pair.first; m_length = pair.second; } void bind(const char *data, size_t length = 0) { m_value = data; if (length > 0) m_length = length; else m_length = strlen(data); } template void bind(const T (&v)[N]) { if (m_type != 0 && !object_traits::is_match(m_type)) throw std::bad_cast(); auto pair = object_traits::data(v, m_data); m_value = pair.first; m_length = pair.second; } private: Oid m_type; const char *m_value; size_t m_length; std::vector m_data; }; template inline void make_binder_list_helper(std::array &binders, Arg &&arg, Other &&...other) { binders[I] = binder(arg); make_binder_list_helper(binders, std::forward(other)...); } template inline std::array make_binder_list(Args &&...args) { std::array binders; binders.reserve(sizeof...(Args)); make_binder_list_helper(binders, std::forward(args)...); return binders; } template inline bool in_impl(const T &from, const T &to) { return std::equal_to()(from, to); } template inline bool in_impl(const T &from, const T &to, const Ts &...other) { return std::equal_to()(from, to) || in_impl(from, other...); } template inline bool in(const T &v) { return in_impl(v, values...); } class result { public: result(PGresult *res) : m_res(res) {} result(const result &) = delete; result(result &&src) { m_res = src.m_res; src.m_res = nullptr; } result &operator=(const result &) = delete; result &operator=(result &&src) { if (this != &src) { clear(); m_res = src.m_res; src.m_res = nullptr; } return *this; } ~result() { clear(); } PGresult *handle() const { return m_res; } operator bool() const { return m_res != nullptr; } ExecStatusType status() const { return PQresultStatus(m_res); } long long affected_rows() const { char *result = PQcmdTuples(m_res); if (result) return strtoll(result, nullptr, 10); else return 0LL; } unsigned int get_column_count() const { return PQnfields(m_res); } int get_param_count() const { return PQnparams(m_res); } Oid get_param_type(int col) const { return PQparamtype(m_res, col); } const char *get_column_name(int col) const { return PQfname(m_res, col); } int get_column_index(const char *name) const { return PQfnumber(m_res, name); } int get_column_length(int col) const { return PQfsize(m_res, col); } Oid get_column_type(int col) const { return PQftype(m_res, col); } const char *get_value(int row, int col) const { return PQgetvalue(m_res, row, col); } bool is_null(int row, int col) const { return PQgetisnull(m_res, row, col); } int length(int row, int col) const { return PQgetlength(m_res, row, col); } Oid insert_oid() const { return PQoidValue(m_res); } template void verify_error() { if (m_res) { ExecStatusType got = status(); if (!in(got)) throw error(m_res); } } template void verify_error(error &e) { if (m_res) { ExecStatusType got = status(); if (!in(got)) e = error(m_res); } } void clear() { if (m_res) { PQclear(m_res); m_res = nullptr; } } private: PGresult *m_res; }; class base_statement { friend class error; public: explicit base_statement(base_database &db); ~base_statement() { } base_statement(const base_statement &) = delete; base_statement(base_statement &&src) : m_conn(src.m_conn), m_binders(std::move(src.m_binders)), m_res(std::move(src.m_res)), _name(std::move(src._name)) { } base_statement &operator=(const base_statement &) = delete; base_statement &operator=(base_statement &&src) { if (this != &src) { close(); m_conn = src.m_conn; m_binders = std::move(src.m_binders); m_res = std::move(src.m_res); } return *this; } result &get_result() { return m_res; } void close() { m_res = nullptr; } uint64_t affetced_rows() const { return m_res.affected_rows(); } void bind_param(size_t index, const char *param, size_t length) { m_binders[index].bind(param, length); } template void bind_param(size_t index, const Param ¶m) { m_binders[index].bind(param); } template void bind_field(size_t index, Type &&value) { if (m_res.is_null(0, static_cast(index))) value = Type(); else value = m_binders[index].get::type>(); } void bind_field(size_t index, char *value, size_t length) { memcpy(value, m_binders[index].value(), std::min(length, m_binders[index].length())); } template void bind_field(size_t index, std::array &&value) { bind_field(index, value.data(), value.size()); } template void bind_field(size_t index, bind_string_helper &&value) { value.assign(m_binders[index].value(), m_binders[index].length()); } template void bind_field(size_t index, indicator &&value) { if (m_res) { qtl::bind_field(*this, index, value.data); value.is_null = m_res.is_null(0, static_cast(index)); value.length = m_res.length(0, static_cast(index)); value.is_truncated = m_binders[index].length() < value.length; } } void bind_field(size_t index, large_object &&value) { if (m_res.is_null(0, static_cast(index))) value.close(); else value = m_binders[index].get(m_conn); } void bind_field(size_t index, blob_data &&value) { if (m_res.is_null(0, static_cast(index))) { value.data = nullptr; value.size = 0; } else { m_binders[index].get(value); } } template void bind_field(size_t index, std::tuple &&value) { if (m_res.is_null(0, static_cast(index))) value = std::tuple(); else m_binders[index].get(value); } #ifdef _QTL_ENABLE_CPP17 template inline void bind_field(size_t index, std::optional &&value) { if (m_res.is_null(0, static_cast(index))) { value.reset(); } else { T v; bind_field(index, v); value = std::move(v); } } void bind_field(size_t index, std::any &&value) { if (m_res.is_null(0, static_cast(index))) { value = nullptr; } else { Oid oid = m_res.get_column_type(index); switch (oid) { case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits::type_id: value = field_cast(index); break; case object_traits>::type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; case object_traits::array_type_id: value = field_cast>(index); break; default: throw postgres::error("Unsupported field type"); } } } #endif // C++17 protected: PGconn *m_conn; result m_res; std::string _name; std::vector m_binders; template void verify_error() { if (m_res) m_res.verify_error(); else throw error(m_conn); } void finish(result &res) { while (res) { res = PQgetResult(m_conn); } } template T field_cast(size_t index) { T v; m_binders[index].get(v); return v; } }; class statement : public base_statement { public: explicit statement(base_database &db) : base_statement(db) { } statement(const statement &) = delete; statement(statement &&src) : base_statement(std::move(src)) { } ~statement() { finish(m_res); if (!_name.empty()) { std::ostringstream oss; oss << "DEALLOCATE " << _name << ";"; result res = PQexec(m_conn, oss.str().data()); error e(res.handle()); } } void open(const char *command, int nParams = 0, const Oid *paramTypes = nullptr) { _name.resize(sizeof(intptr_t) * 2 + 1); int n = sprintf(const_cast(_name.data()), "q%p", this); _name.resize(n); std::transform(_name.begin(), _name.end(), _name.begin(), tolower); result res = PQprepare(m_conn, _name.data(), command, nParams, paramTypes); res.verify_error(); } template void open(const char *command) { auto binder_list = make_binder_list(Types()...); std::array types; std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder &b) { return b.type(); }); open(command, types.size(), types.data()); } void attach(const char *name) { result res = PQdescribePrepared(m_conn, name); res.verify_error(); _name = name; } void execute() { if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1)) throw error(m_conn); if (!PQsetSingleRowMode(m_conn)) throw error(m_conn); m_res = PQgetResult(m_conn); verify_error(); } template void execute(const Types ¶ms) { const size_t count = qtl::params_binder::size; if (count > 0) { m_binders.resize(count); qtl::bind_params(*this, params); std::array values; std::array lengths; std::array formats; for (size_t i = 0; i != m_binders.size(); i++) { values[i] = m_binders[i].value(); lengths[i] = static_cast(m_binders[i].length()); formats[i] = 1; } if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast(m_binders.size()), values.data(), lengths.data(), formats.data(), 1)) throw error(m_conn); } else { if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1)) throw error(m_conn); } if (!PQsetSingleRowMode(m_conn)) throw error(m_conn); m_res = PQgetResult(m_conn); verify_error(); } template bool fetch(Types &&values) { if (m_res) { ExecStatusType status = m_res.status(); if (status == PGRES_SINGLE_TUPLE) { int count = m_res.get_column_count(); if (count > 0) { m_binders.resize(count); for (int i = 0; i != count; i++) { m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i), m_res.get_column_type(i)); } qtl::bind_record(*this, std::forward(values)); } m_res = PQgetResult(m_conn); return true; } else { verify_error(); } } return false; } bool next_result() { m_res = PQgetResult(m_conn); return m_res && m_res.status() == PGRES_SINGLE_TUPLE; } void reset() { finish(m_res); m_res.clear(); } }; class base_database { protected: base_database() { m_conn = nullptr; } public: typedef postgres::error exception_type; base_database(const base_database &) = delete; base_database(base_database &&src) { m_conn = src.m_conn; src.m_conn = nullptr; } ~base_database() { if (m_conn) PQfinish(m_conn); } base_database &operator=(const base_database &) = delete; base_database &operator=(base_database &&src) { if (this != &src) { if (m_conn) PQfinish(m_conn); m_conn = src.m_conn; src.m_conn = nullptr; } return *this; } const char *errmsg() const { return PQerrorMessage(m_conn); } PGconn *handle() { return m_conn; } const char *encoding() const { int encoding = PQclientEncoding(m_conn); return (encoding >= 0) ? pg_encoding_to_char(encoding) : nullptr; } void encoding(const char *encoding) { if (PQsetClientEncoding(m_conn, encoding)) throw error(m_conn); } void trace(FILE *stream) { PQtrace(m_conn, stream); } void untrace() { PQuntrace(m_conn); } const char *current() const { return PQdb(m_conn); } const char *user() const { return PQuser(m_conn); } const char *host() const { return PQhost(m_conn); } const char *password() const { return PQpass(m_conn); } const char *port() const { return PQport(m_conn); } const char *options() const { return PQoptions(m_conn); } ConnStatusType status() const { return PQstatus(m_conn); } PGTransactionStatusType transactionStatus() const { return PQtransactionStatus(m_conn); } const char *parameterStatus(const char *paramName) const { return PQparameterStatus(m_conn, paramName); } void reset() { if (status() == CONNECTION_BAD) PQreset(m_conn); } void close() { PQfinish(m_conn); m_conn = nullptr; } protected: PGconn *m_conn; void throw_exception() { throw postgres::error(m_conn); } }; class simple_statment : public base_statement { public: simple_statment(base_database &db, qtl::postgres::result &&res) : base_statement(db) { m_res = std::move(res); } template void fetch_all(ValueProc &proc) { int row_count = PQntuples(m_res.handle()); if (row_count > 0) { int col_count = m_res.get_column_count(); m_binders.resize(col_count); auto values = qtl::detail::make_values(proc); for (int i = 0; i != row_count; i++) { for (int j = 0; j != col_count; j++) { m_binders[j] = binder(m_res.get_value(i, j), m_res.length(i, j), m_res.get_column_type(j)); } qtl::bind_record(*this, std::forward(values)); qtl::detail::apply(proc, std::forward(values)); } } } }; class database : public base_database, public qtl::base_database { public: database() = default; bool open(const std::map ¶ms, bool expand_dbname = false) { std::vector keywords(params.size() + 1); std::vector values(params.size() + 1); for (auto ¶m : params) { keywords.push_back(param.first.data()); values.push_back(param.second.data()); } keywords.push_back(nullptr); values.push_back(nullptr); m_conn = PQconnectdbParams(keywords.data(), values.data(), expand_dbname); return m_conn != nullptr && status() == CONNECTION_OK; } bool open(const char *conninfo) { m_conn = PQconnectdb(conninfo); return m_conn != nullptr && status() == CONNECTION_OK; } bool open(const char *host, const char *user, const char *password, unsigned short port = 5432, const char *db = "postgres", const char *options = nullptr) { char port_text[16]; sprintf(port_text, "%u", port); m_conn = PQsetdbLogin(host, port_text, options, nullptr, db, user, password); return m_conn != nullptr && status() == CONNECTION_OK; } statement open_command(const char *query_text, size_t /*text_length*/) { statement stmt(*this); stmt.open(query_text); return stmt; } statement open_command(const char *query_text) { return open_command(query_text, 0); } statement open_command(const std::string &query_text) { return open_command(query_text.data()); } void simple_execute(const char *query_text, uint64_t *paffected = nullptr) { qtl::postgres::result res(PQexec(m_conn, query_text)); if (!res) throw_exception(); res.verify_error(); if (paffected) *paffected = res.affected_rows(); } template void simple_query(const char *query_text, ValueProc &&proc) { qtl::postgres::result res(PQexec(m_conn, query_text)); if (!res) throw_exception(); res.verify_error(); if (res.status() == PGRES_TUPLES_OK) { simple_statment stmt(*this, std::move(res)); stmt.fetch_all(std::forward(proc)); } } void auto_commit(bool on) { if (on) simple_execute("SET AUTOCOMMIT TO ON"); else simple_execute("SET AUTOCOMMIT TO OFF"); } void begin_transaction() { simple_execute("BEGIN"); } void rollback() { simple_execute("ROLLBACK"); } void commit() { simple_execute("COMMIT"); } bool is_alive() { qtl::postgres::result res(PQexec(m_conn, "")); return res && res.status() == PGRES_COMMAND_OK; } }; inline int event_flags(PostgresPollingStatusType status) { int flags = 0; if (status == PGRES_POLLING_READING) flags |= event::ef_read; else if (status == PGRES_POLLING_WRITING) flags |= event::ef_write; else if (status == PGRES_POLLING_FAILED) flags |= event::ef_exception; return flags; } class async_connection; template inline void async_wait(qtl::event *event, PGconn *conn, int timeout, Handler &&handler) { int flushed = PQflush(conn); if (flushed < 0) { handler(error(conn)); return; } if (flushed == 1) { event->set_io_handler(qtl::event::ef_read | qtl::event::ef_write, timeout, [event, conn, timeout, handler](int flags) mutable { if (flags & qtl::event::ef_timeout) { handler(postgres::timeout()); return; } if (flags & qtl::event::ef_read) { if (!PQconsumeInput(conn)) { handler(error(conn)); return; } } if (flags & (qtl::event::ef_read | qtl::event::ef_write | event::ef_exception)) async_wait(event, conn, timeout, handler); }); } else { event->set_io_handler(qtl::event::ef_read, 10, [event, conn, timeout, handler](int flags) mutable { if (flags & qtl::event::ef_timeout) { handler(postgres::timeout()); } else if (flags & (qtl::event::ef_read | qtl::event::ef_exception)) { if (PQconsumeInput(conn)) { if (!PQisBusy(conn)) handler(postgres::error()); else async_wait(event, conn, timeout, handler); } else { handler(postgres::error(conn)); } } else { handler(postgres::error(conn)); } }); } } class async_statement : public base_statement { public: async_statement(async_connection &db); async_statement(async_statement &&src) : base_statement(std::move(src)), m_timeout(2) { m_event = src.m_event; m_timeout = src.m_timeout; src.m_event = nullptr; } async_statement &operator=(async_statement &&src) { if (this != &src) { base_statement::operator=(std::move(src)); m_event = src.m_event; m_timeout = src.m_timeout; src.m_event = nullptr; } return *this; } ~async_statement() { close(); } /* Handler defiens as: void handler(const qtl::mysql::error& e); */ template void open(Handler &&handler, const char *command, int nParams = 0, const Oid *paramTypes = nullptr) { _name.resize(sizeof(intptr_t) * 2 + 1); int n = sprintf(const_cast(_name.data()), "q%p", this); _name.resize(n); std::transform(_name.begin(), _name.end(), _name.begin(), tolower); if (PQsendPrepare(m_conn, _name.data(), command, nParams, paramTypes)) { async_wait([this, handler](error e) mutable { if (!e) { m_res = PQgetResult(m_conn); if (m_res) { m_res.verify_error(e); while(m_res) m_res = PQgetResult(m_conn); } } handler(e); }); } else { _name.clear(); handler(error(m_conn)); } } template void open(Handler &&handler, const char *command) { auto binder_list = make_binder_list(Types()...); std::array types; std::transform(binder_list.begin(), binder_list.end(), types.begin(), [](const binder &b) { return b.type(); }); open(std::forward(handler), command, types.size(), types.data()); } void close() { while (m_res) { m_res = PQgetResult(m_conn); } if (!_name.empty()) { std::ostringstream oss; oss << "DEALLOCATE " << _name << ";"; result res = PQexec(m_conn, oss.str().data()); error e; res.verify_error(e); finish(res); if (e) throw e; } base_statement::close(); } template void close(Handler &&handler) { while (m_res) { if (PQisBusy(m_conn)) { async_wait([this, handler](const error &e) mutable { close(handler); }); } else { m_res = PQgetResult(m_conn); } } if (!_name.empty() && PQstatus(m_conn) == CONNECTION_OK) { std::ostringstream oss; oss << "DEALLOCATE " << _name << ";"; bool ok = PQsendQuery(m_conn, oss.str().data()); if (ok) { async_wait([this, handler](postgres::error e) mutable { if (PQstatus(m_conn) == CONNECTION_OK) { result res(PQgetResult(m_conn)); if (res) res.verify_error(e); if (!e) _name.clear(); finish(res); handler(e); } else { _name.clear(); handler(error()); } }); } else { handler(error(m_conn)); } } else { _name.clear(); } } /* ExecuteHandler defiens as: void handler(const qtl::mysql::error& e, uint64_t affected); */ template void execute(ExecuteHandler &&handler) { if (PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1) && PQsetSingleRowMode(m_conn)) { async_wait([this, handler](error e) { if (!e) { m_res = PQgetResult(m_conn); m_res.verify_error(e); finish(m_res); } handler(e); }); } else { handler(error(m_conn)); } } template void execute(const Types ¶ms, Handler &&handler) { const size_t count = qtl::params_binder::size; if (count > 0) { m_binders.resize(count); qtl::bind_params(*this, params); std::array values; std::array lengths; std::array formats; for (size_t i = 0; i != m_binders.size(); i++) { values[i] = m_binders[i].value(); lengths[i] = static_cast(m_binders[i].length()); formats[i] = 1; } if (!PQsendQueryPrepared(m_conn, _name.data(), static_cast(m_binders.size()), values.data(), lengths.data(), formats.data(), 1)) { handler(error(m_conn), 0); return; } } else { if (!PQsendQueryPrepared(m_conn, _name.data(), 0, nullptr, nullptr, nullptr, 1)) { handler(error(m_conn), 0); return; } } if (!PQsetSingleRowMode(m_conn)) { handler(error(m_conn), 0); return; } if (PQisBusy(m_conn)) { async_wait([this, handler](error e) mutable { if (!e) { m_res = PQgetResult(m_conn); m_res.verify_error(e); int64_t affected = m_res.affected_rows(); finish(m_res); handler(e, affected); } else { handler(e, 0); } }); } } template void fetch(Types &&values, RowHandler &&row_handler, FinishHandler &&finish_handler) { if (m_res) { ExecStatusType status = m_res.status(); if (status == PGRES_SINGLE_TUPLE) { int count = m_res.get_column_count(); if (count > 0) { m_binders.resize(count); for (int i = 0; i != count; i++) { m_binders[i] = binder(m_res.get_value(0, i), m_res.length(0, i), m_res.get_column_type(i)); } qtl::bind_record(*this, std::forward(values)); } row_handler(); if (PQisBusy(m_conn)) { async_wait([this, &values, row_handler, finish_handler](const error &e) { if (e) { finish_handler(e); } else { m_res = PQgetResult(m_conn); fetch(std::forward(values), row_handler, finish_handler); } }); } else { m_res = PQgetResult(m_conn); fetch(std::forward(values), row_handler, finish_handler); } } else { error e; m_res.verify_error(e); finish_handler(e); } } else { finish_handler(error()); } } template void next_result(Handler &&handler) { async_wait([this, handler](const error &e) { if (e) { handler(e); } else { m_res = PQgetResult(m_conn); handler(error()); } }); } private: event *m_event; int m_timeout; template void async_wait(Handler &&handler) { qtl::postgres::async_wait(m_event, m_conn, m_timeout, std::forward(handler)); } }; class async_connection : public base_database, public qtl::async_connection { public: async_connection() : m_connect_timeout(2), m_query_timeout(2) { } async_connection(async_connection &&src) : base_database(std::move(src)), m_connect_timeout(src.m_connect_timeout), m_query_timeout(src.m_query_timeout) { } async_connection &operator=(async_connection &&src) { if (this != &src) { base_database::operator=(std::move(src)); m_connect_timeout = src.m_connect_timeout; m_query_timeout = src.m_query_timeout; } return *this; } /* OpenHandler defines as: void handler(const qtl::postgres::error& e) NOEXCEPT; */ template void open(EventLoop &ev, OpenHandler &&handler, const std::map ¶ms, bool expand_dbname = false) { std::vector keywords; std::vector values; keywords.reserve(params.size()); values.reserve(params.size()); for (auto ¶m : params) { keywords.push_back(param.first.data()); values.push_back(param.second.data()); } keywords.push_back(nullptr); values.push_back(nullptr); m_conn = PQconnectStartParams(keywords.data(), values.data(), expand_dbname); if (m_conn == nullptr) throw std::bad_alloc(); if (status() == CONNECTION_BAD) { handler(error(m_conn)); return; } if (PQsetnonblocking(m_conn, true) != 0) handler(error(m_conn)); get_options(); bind(ev); wait_connect(std::forward(handler)); } template void open(EventLoop &ev, OpenHandler &&handler, const char *conninfo) { m_conn = PQconnectStart(conninfo); if (m_conn == nullptr) throw std::bad_alloc(); if (status() == CONNECTION_BAD) { handler(error(m_conn)); return; } PQsetnonblocking(m_conn, true); get_options(); bind(ev); wait_connect(std::forward(handler)); } template void reset(OpenHandler &&handler) { PQresetStart(m_conn); wait_reset(std::forward(handler)); } /* Handler defines as: void handler(const qtl::mysql::error& e, uint64_t affected) NOEXCEPT; */ template void simple_execute(ExecuteHandler &&handler, const char *query_text) NOEXCEPT { bool ok = PQsendQuery(m_conn, query_text); if (ok) { async_wait([this, handler](postgres::error e) mutable { result res(PQgetResult(m_conn)); res.verify_error(e); uint64_t affected = res.affected_rows(); handler(e, affected); while (res) res = PQgetResult(m_conn); }); } else { handler(error(m_conn), 0); } } template void auto_commit(Handler &&handler, bool on) NOEXCEPT { simple_execute(std::forward(handler), on ? "SET AUTOCOMMIT TO ON" : "SET AUTOCOMMIT TO OFF"); } template void begin_transaction(Handler &&handler) NOEXCEPT { simple_execute(std::forward(handler), "BEGIN"); } template void rollback(Handler &&handler) NOEXCEPT { simple_execute(std::forward(handler), "ROLLBACK"); } template void commit(Handler &&handler) NOEXCEPT { simple_execute(std::forward(handler), "COMMIT"); } /* ResultHandler defines as: void result_handler(const qtl::postgres::error& e) NOEXCEPT; */ template void simple_query(const char *query, RowHandler &&row_handler, ResultHandler &&result_handler) NOEXCEPT { bool ok = PQsendQuery(m_conn, query); if (ok) { async_wait([this, row_handler, result_handler](postgres::error e) mutable { result res(PQgetResult(m_conn)); res.verify_error(e); if (e) { result_handler(e, 0); return; } uint64_t affected = res.affected_rows(); while (res && res.status() == PGRES_TUPLES_OK) { simple_statment stmt(*this, std::move(res)); stmt.fetch_all(row_handler); res = PQgetResult(m_conn); } result_handler(e, affected); }); } else { result_handler(error(m_conn), 0); } } template void open_command(const char *query_text, size_t /*text_length*/, Handler &&handler) { std::shared_ptr stmt = std::make_shared(*this); stmt->open([stmt, handler](const postgres::error &e) mutable { handler(e, stmt); }, query_text, 0); } template void is_alive(Handler &&handler) NOEXCEPT { simple_execute(std::forward(handler), ""); } socket_type socket() const NOEXCEPT { return PQsocket(m_conn); } int connect_timeout() const { return m_connect_timeout; } void connect_timeout(int timeout) { m_connect_timeout = timeout; } int query_timeout() const { return m_query_timeout; } void query_timeout(int timeout) { m_query_timeout = timeout; } private: int m_connect_timeout; int m_query_timeout; void get_options() { PQconninfoOption *options = PQconninfo(m_conn); m_connect_timeout = 2; for (PQconninfoOption *option = options; option; option++) { if (strcmp(option->keyword, "connect_timeout") == 0) { if (option->val) m_connect_timeout = atoi(option->val); break; } } PQconninfoFree(options); } template void wait_connect(OpenHandler &&handler) NOEXCEPT { PostgresPollingStatusType status = PQconnectPoll(m_conn); switch (status) { case PGRES_POLLING_READING: case PGRES_POLLING_WRITING: m_event_handler->set_io_handler(event_flags(status), m_connect_timeout, [this, handler](int flags) mutable { if (flags & event::ef_timeout) { handler(postgres::timeout()); } else if (flags & (event::ef_read | event::ef_write | event::ef_exception)) wait_connect(std::forward(handler)); }); break; case PGRES_POLLING_FAILED: handler(postgres::error(handle())); break; case PGRES_POLLING_OK: // PQsetnonblocking(m_conn, true); handler(postgres::error()); } } template void wait_reset(OpenHandler &&handler) NOEXCEPT { PostgresPollingStatusType status = PQresetPoll(m_conn); switch (status) { case PGRES_POLLING_READING: case PGRES_POLLING_WRITING: m_event_handler->set_io_handler(event_flags(status), m_connect_timeout, [this, handler](int flags) mutable { if (flags & event::ef_timeout) { handler(postgres::timeout()); } else if (flags & (event::ef_read | event::ef_write | event::ef_exception)) wait_reset(std::forward(handler)); }); break; case PGRES_POLLING_FAILED: handler(postgres::error(m_conn)); break; case PGRES_POLLING_OK: handler(postgres::error()); } } template void async_wait(Handler &&handler) { qtl::postgres::async_wait(event(), m_conn, m_query_timeout, std::forward(handler)); } }; inline async_statement::async_statement(async_connection &db) : base_statement(static_cast(db)) { m_event = db.event(); m_timeout = db.query_timeout(); } typedef qtl::transaction transaction; template using query_iterator = qtl::query_iterator; template using query_result = qtl::query_result; inline base_statement::base_statement(base_database &db) : m_res(nullptr) { m_conn = db.handle(); m_res = nullptr; } } } #endif //_SQL_POSTGRES_H_