From ebd997c5ce55fd260696f9bb117f1f06c09a4759 Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Sun, 16 Jun 2019 13:52:44 +0000
Subject: [PATCH] 通过 MariaDB 的非阻塞函数访问 MySQL。
---
include/qtl_database_pool.hpp | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 165 insertions(+), 0 deletions(-)
diff --git a/include/qtl_database_pool.hpp b/include/qtl_database_pool.hpp
index 73ca349..81c5f28 100644
--- a/include/qtl_database_pool.hpp
+++ b/include/qtl_database_pool.hpp
@@ -137,6 +137,7 @@
return;
m_trying_connection=true;
+
try
{
m_background_thread=std::thread(&database_pool<Database>::background_connect, this);
@@ -175,6 +176,170 @@
}
};
+template<typename T, typename EventLoop, typename Connection>
+class async_pool
+{
+public:
+ typedef Connection value_type;
+ typedef std::shared_ptr<Connection> pointer;
+
+ async_pool(EventLoop& ev)
+ : m_ev(ev), m_trying_connecting(false)
+ {
+ }
+
+ virtual ~async_pool()
+ {
+ clear();
+ }
+
+ /*
+ Handler defines as:
+ void handler(const pointer& ptr);
+ */
+ template<typename Handler>
+ void get(Handler&& handler, EventLoop* ev=nullptr)
+ {
+ Connection* db = popup();
+ if(ev==nullptr) ev=&m_ev;
+
+ if(db)
+ {
+ db->bind(*ev);
+ handler(typename Connection::exception_type(), wrap(db));
+ }
+ else if (m_trying_connecting == false)
+ {
+ create_connection(ev, [this, handler](const typename Connection::exception_type& e, Connection* db) {
+ handler(e, wrap(db));
+ });
+ }
+ else
+ {
+ handler(typename Connection::exception_type(), nullptr);
+ }
+ }
+
+ void test_alive()
+ {
+ if (m_connections.empty())
+ return false;
+ std::unique_lock<std::mutex> lock(m_pool_mutex);
+ auto it = m_connections.begin();
+ while (it != m_connections.end())
+ {
+ Connection* db = *it;
+ db->is_alive([this, db](const typename Connection::exception_type& e) {
+ if (e)
+ {
+ std::unique_lock<std::mutex> lock(m_pool_mutex);
+ auto it = std::find(m_connections.begin(), m_connections.end(), db);
+ delete db;
+ m_connections.erase(it);
+ if (m_connections.empty())
+ try_connect();
+ }
+ });
+ ++it;
+ }
+ }
+
+private:
+ EventLoop& m_ev;
+ std::vector<Connection*> m_connections;
+ std::recursive_mutex m_pool_mutex;
+ std::atomic<bool> m_trying_connecting;
+
+ void recovery(Connection* db)
+ {
+ if (db == NULL) return;
+ db->is_alive([this, db](const typename Connection::exception_type& e) {
+ if (e)
+ {
+ {
+ std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+ clear();
+ }
+ try_connect();
+ }
+ else
+ {
+ if(!db->unbind())
+ throw std::runtime_error("destroy a busysing connection.");
+ std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+ m_connections.push_back(db);
+ }
+ });
+ }
+
+ template<typename Handler>
+ void create_connection(EventLoop* ev, Handler&& handler)
+ {
+ T* pThis = static_cast<T*>(this);
+ pThis->new_connection(*ev, [this, handler](const typename Connection::exception_type& e, Connection* db) {
+ handler(e, db);
+ if (!db)
+ {
+ std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+ clear();
+
+ timeval tv = { 1, 0};
+ m_ev.set_timeout(tv, [this]() {
+ try_connect();
+ });
+ }
+ });
+ }
+
+ Connection* popup()
+ {
+ Connection* db = nullptr;
+ std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+ if (!m_connections.empty())
+ {
+ db = m_connections.back();
+ m_connections.pop_back();
+ }
+ return db;
+ }
+
+ void try_connect()
+ {
+ if (m_trying_connecting)
+ return;
+
+ m_trying_connecting = true;
+ create_connection(&m_ev, [this](const typename Connection::exception_type& e, Connection* db) {
+ if (db)
+ {
+ std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+ m_connections.push_back(db);
+ }
+ else
+ {
+ m_trying_connecting = false;
+ }
+ });
+ }
+
+ void clear()
+ {
+ std::for_each(m_connections.begin(), m_connections.end(), std::default_delete<Connection>());
+ m_connections.clear();
+ }
+
+ pointer wrap(Connection* db)
+ {
+ if (db)
+ {
+ return pointer(db, [this](Connection* db) {
+ recovery(db);
+ });
+ }
+ else return nullptr;
+ }
+};
+
}
#endif //_QTL_DATABASE_POOL_H_
--
Gitblit v1.9.3