From 16519fab3d99ca277bce00d2bcbfd46b33a46391 Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Sun, 16 Jun 2019 13:27:41 +0000
Subject: [PATCH] 通过 MariaDB 的非阻塞函数访问 MySQL。

---
 include/qtl_database_pool.hpp |  165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 165 insertions(+), 0 deletions(-)

diff --git a/include/qtl_database_pool.hpp b/include/qtl_database_pool.hpp
index 73ca349..81c5f28 100644
--- a/include/qtl_database_pool.hpp
+++ b/include/qtl_database_pool.hpp
@@ -137,6 +137,7 @@
 			return;
 
 		m_trying_connection=true;
+		
 		try
 		{
 			m_background_thread=std::thread(&database_pool<Database>::background_connect, this);
@@ -175,6 +176,170 @@
 	}
 };
 
+template<typename T, typename EventLoop, typename Connection>
+class async_pool
+{
+public:
+	typedef Connection value_type;
+	typedef std::shared_ptr<Connection> pointer;
+
+	async_pool(EventLoop& ev)
+		: m_ev(ev), m_trying_connecting(false)
+	{
+	}
+
+	virtual ~async_pool()
+	{
+		clear();
+	}
+
+	/*
+		Handler defines as:
+		void handler(const pointer& ptr);
+	*/
+	template<typename Handler>
+	void get(Handler&& handler, EventLoop* ev=nullptr)
+	{
+		Connection* db = popup();
+		if(ev==nullptr) ev=&m_ev;
+		
+		if(db)
+		{
+			db->bind(*ev);
+			handler(typename Connection::exception_type(), wrap(db));
+		}
+		else if (m_trying_connecting == false)
+		{
+			create_connection(ev, [this, handler](const typename Connection::exception_type& e,  Connection* db) {
+				handler(e, wrap(db));
+			});
+		}
+		else
+		{
+			handler(typename Connection::exception_type(), nullptr);
+		}
+	}
+
+	void test_alive()
+	{
+		if (m_connections.empty())
+			return false;
+		std::unique_lock<std::mutex> lock(m_pool_mutex);
+		auto it = m_connections.begin();
+		while (it != m_connections.end())
+		{
+			Connection* db = *it;
+			db->is_alive([this, db](const typename Connection::exception_type& e) {
+				if (e)
+				{
+					std::unique_lock<std::mutex> lock(m_pool_mutex);
+					auto it = std::find(m_connections.begin(), m_connections.end(), db);
+					delete db;
+					m_connections.erase(it);
+					if (m_connections.empty())
+						try_connect();
+				}
+			});
+			++it;
+		}
+	}
+
+private:
+	EventLoop& m_ev;
+	std::vector<Connection*> m_connections;
+	std::recursive_mutex m_pool_mutex;
+	std::atomic<bool> m_trying_connecting;
+
+	void recovery(Connection* db)
+	{
+		if (db == NULL) return;
+		db->is_alive([this, db](const typename Connection::exception_type& e) {
+			if (e)
+			{
+				{
+					std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+					clear();
+				}
+				try_connect();
+			}
+			else
+			{
+				if(!db->unbind())
+					throw std::runtime_error("destroy a busysing connection.");
+				std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+				m_connections.push_back(db);
+			}
+		});
+	}
+
+	template<typename Handler>
+	void create_connection(EventLoop* ev, Handler&& handler)
+	{
+		T* pThis = static_cast<T*>(this);
+		pThis->new_connection(*ev, [this, handler](const typename Connection::exception_type& e, Connection* db) {
+			handler(e, db);
+			if (!db)
+			{
+				std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+				clear();
+
+				timeval tv = { 1,  0};
+				m_ev.set_timeout(tv, [this]() {
+					try_connect();
+				});
+			}
+		});
+	}
+
+	Connection* popup()
+	{
+		Connection* db = nullptr;
+		std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+		if (!m_connections.empty())
+		{
+			db = m_connections.back();
+			m_connections.pop_back();
+		}
+		return db;
+	}
+
+	void try_connect()
+	{
+		if (m_trying_connecting)
+			return;
+
+		m_trying_connecting = true;
+		create_connection(&m_ev, [this](const typename Connection::exception_type& e, Connection* db) {
+			if (db)
+			{
+				std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
+				m_connections.push_back(db);
+			}
+			else
+			{
+				m_trying_connecting = false;
+			}
+		});
+	}
+
+	void clear()
+	{
+		std::for_each(m_connections.begin(), m_connections.end(), std::default_delete<Connection>());
+		m_connections.clear();
+	}
+
+	pointer wrap(Connection* db)
+	{
+		if (db)
+		{
+			return pointer(db, [this](Connection* db) {
+				recovery(db);
+			});
+		}
+		else return nullptr;
+	}
+};
+
 }
 
 #endif //_QTL_DATABASE_POOL_H_

--
Gitblit v1.9.3