From 0d3e994e33aa85965616a064bbf635ec8d043c1a Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Fri, 02 Oct 2020 06:11:48 +0000
Subject: [PATCH] The database can be operated asynchronously through asio.

---
 test/TestMariaDB.cpp |  462 ++++++++++++---------------------------------------------
 1 files changed, 98 insertions(+), 364 deletions(-)

diff --git a/test/TestMariaDB.cpp b/test/TestMariaDB.cpp
index 8435e6b..c78eda6 100644
--- a/test/TestMariaDB.cpp
+++ b/test/TestMariaDB.cpp
@@ -6,260 +6,11 @@
 #include <time.h>
 #include <limits.h>
 #include "../include/qtl_mysql_pool.hpp"
+#include "../include/qtl_asio.hpp"
 
 #if MARIADB_VERSION_ID < 0100000
 #error "The program need mariadb version > 10.0"
 #endif 
-
-class simple_event_loop
-{
-public:
-	simple_event_loop()
-	{
-		m_expired = 0;
-		m_maxfd = 0;
-		m_stoped = false;
-		FD_ZERO(&m_readset);
-		FD_ZERO(&m_writeset);
-		FD_ZERO(&m_exceptset);
-	}
-
-	void reset()
-	{
-		m_stoped = false;
-	}
-
-	void run()
-	{
-		while (!m_stoped)
-			do_once(nullptr);
-	}
-
-	void run_for(long seconds)
-	{
-		time_t expired, now;
-		time(&now);
-		expired = now + seconds;
-		timeval tv = { seconds };
-		do
-		{
-			do_once(&tv);
-			time(&now);
-			tv.tv_sec = expired - now;
-			tv.tv_usec = 0;
-		} while (!m_stoped && now < expired);
-	}
-
-	void stop()
-	{
-		m_stoped = true;
-	}
-
-private:
-	class event_item : public qtl::event
-	{
-	public:
-		event_item(simple_event_loop* loop, qtl::socket_type fd)
-			: m_loop(loop), m_fd(fd), m_expired(LONG_MAX)
-		{
-		}
-
-		virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&& handler) NOEXCEPT override
-		{
-			if (timeout > 0)
-			{
-				time_t now;
-				time(&now);
-				if (m_expired > now + timeout)
-					m_expired = now + timeout;
-				m_loop->set(this, flags, m_expired);
-			}
-			else
-			{
-				m_expired = LONG_MAX;
-				m_loop->set(this, flags, 0);
-			}
-			m_io_handler = std::forward<std::function<void(int)>>(handler);
-		}
-		virtual void remove() override
-		{
-			m_loop->remove(this);
-		}
-		virtual bool is_busying() override
-		{
-			return m_io_handler!=nullptr;
-		}
-
-		simple_event_loop* m_loop;
-		qtl::socket_type m_fd;
-		time_t m_expired;
-		std::function<void(int)> m_io_handler;
-	};
-	std::vector<std::unique_ptr<event_item>> m_events;
-
-	//implement for QTL
-public:
-	template<typename Connection>
-	event_item* add(Connection* connection)
-	{
-		qtl::socket_type fd = connection->socket();
-		event_item* result = new event_item(this, fd);
-		std::unique_ptr<event_item> item(result);
-		m_events.emplace_back(std::move(item));
-		return result;
-	}
-
-	template<typename Handler>
-	event_item* set_timeout(const timeval& timeout, Handler&& handler)
-	{
-		event_item* result = new event_item(this, 0);
-		std::unique_ptr<event_item> item(result);
-		::time(&result->m_expired);
-		result->m_expired+=timeout.tv_sec;
-		if(m_expired>result->m_expired)
-			m_expired=result->m_expired;
-		m_events.emplace_back(std::move(item));
-		return result;
-	}
-
-public:
-	void set(event_item* item, int flags, time_t expired)
-	{
-		if (item->m_fd + 1 > m_maxfd)
-			m_maxfd = item->m_fd + 1;
-		if (flags&qtl::event::ef_read)
-			FD_SET(item->m_fd, &m_readset);
-		if (flags&qtl::event::ef_write)
-			FD_SET(item->m_fd, &m_writeset);
-		if (flags&qtl::event::ef_exception)
-			FD_SET(item->m_fd, &m_exceptset);
-		if (expired > 0 && m_expired > expired)
-			m_expired = expired;
-	}
-
-	void remove(event_item* item)
-	{
-		auto it = m_events.begin();
-		while (it != m_events.end())
-		{
-			if (it->get() == item)
-			{
-				m_events.erase(it);
-				return;
-			}
-			++it;
-		}
-	}
-
-private:
-	fd_set m_readset, m_writeset, m_exceptset;
-	qtl::socket_type m_maxfd;
-	int do_once(timeval* timeout)
-	{
-		fd_set rs = m_readset, ws = m_writeset, es = m_exceptset;
-		timeval tv = { INT_MAX, 0 };
-		time_t now;
-		time(&now);
-		if (timeout)
-		{
-			tv = *timeout;
-			timeout = &tv;
-		}
-
-		long d = m_expired - now;
-		if (d > 0 && tv.tv_sec > d)
-		{
-			tv.tv_sec = d;
-			timeout = &tv;
-		}
-
-		qtl::socket_type maxfd = m_maxfd;
-		FD_ZERO(&m_readset);
-		FD_ZERO(&m_writeset);
-		FD_ZERO(&m_exceptset);
-		m_maxfd = 0;
-
-		if (maxfd == 0)
-		{
-			if (timeout)
-			{
-				std::this_thread::sleep_for(std::chrono::microseconds(timeout->tv_sec*std::micro::den + timeout->tv_usec));
-				check_timeout();
-			}
-			else
-			{
-				std::this_thread::sleep_for(std::chrono::seconds(1));
-			}
-			return 0;
-		}
-		else
-		{
-			int ret = select(maxfd, &rs, &ws, &es, timeout);
-			if (ret > 0)
-			{
-				for (auto& item : m_events)
-				{
-					int flags = 0;
-					if (FD_ISSET(item->m_fd, &rs))
-						flags |= qtl::event::ef_read;
-					if (FD_ISSET(item->m_fd, &ws))
-						flags |= qtl::event::ef_write;
-					if (FD_ISSET(item->m_fd, &es))
-						flags |= qtl::event::ef_exception;
-					if (flags && item->m_io_handler)
-					{
-						auto handler = std::move(item->m_io_handler);
-						handler(flags);
-					}
-				}
-			}
-			else if (ret == 0)
-			{
-				time(&now);
-				for (auto& item : m_events)
-				{
-					if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
-					{
-						item->m_io_handler(qtl::event::ef_timeout);
-						item->m_expired = 0;
-						item->m_io_handler = nullptr;
-					}
-				}
-			}
-			else if (ret < 0)
-			{
-				int errc;
-#ifdef _WIN32
-				errc = WSAGetLastError();
-#else
-				errc = errno;
-#endif
-				throw std::system_error(errc, std::system_category());
-			}
-			return ret;
-		}
-	}
-
-	void check_timeout()
-	{
-		time_t now;
-		time(&now);
-		for (auto& item : m_events)
-		{
-			if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
-			{
-				item->m_io_handler(qtl::event::ef_timeout);
-				item->m_expired = 0;
-				item->m_io_handler = nullptr;
-			}
-		}
-	}
-
-	time_t m_expired;
-	bool m_stoped;
-};
-
-simple_event_loop ev;
 
 using namespace qtl::mysql;
 
@@ -270,152 +21,135 @@
 
 const char mysql_server[]="localhost";
 const char mysql_user[]="root";
-const char mysql_password[]="";
+const char mysql_password[]="123456";
 const char mysql_database[]="test";
 
-void SimpleTest()
+class MariaDBTest
 {
-	async_connection connection;
-	ev.reset();
-	connection.open(ev, [&connection](const error& e) {
-		if (e)
-		{
-			LogError(e);
-			ev.stop();
-		}
-		else
-		{
-			printf("Connect to mysql ok.\n");
-			connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
-				for (int i = 0; i != field_count; i++)
-					printf("%s\t", row[i]);
-				printf("\n");
-				return true;
-			}, [&connection](const error& e, size_t row_count) {
-				if (e)
-					LogError(e);
-				else
-					printf("Total %lu rows.\n", row_count);
+public:
+	MariaDBTest()
+	{
+		Open();
+		_service.run();
+	}
 
-				connection.close([]() {
-					printf("Close connection ok.\n");
-					ev.stop();
-				});
-			});
-		}
-	}, mysql_server, mysql_user, mysql_password, mysql_database);
+private:
+	void Open();
+	void Close();
+	void SimpleQuery();
+	void Execute();
+	void Query();
+	void MultiQuery();
 
-	ev.run();
-}
+private:
+	async_connection _connection;
+	qtl::asio::service _service;
 
-void ExecuteTest()
-{
-	async_connection connection;
-	ev.reset();
-	connection.open(ev, [&connection](const error& e) {
-		if (e)
-		{
-			LogError(e);
-			ev.stop();
-		}
-		else
-		{
-			printf("Connect to mysql ok.\n");
-			connection.execute([&connection](const error& e, uint64_t affected) mutable {
-					if (e)
-						LogError(e);
-					else
-						printf("Insert %llu records ok.\n", affected);
-					connection.close([]() {
-						printf("Close connection ok.\n");
-						ev.stop();
-					});
-			}, "insert into test(name, createtime, company) values(?, now(), ?)", 0, std::make_tuple("test name", "test company"));
-		}
-	}, mysql_server, mysql_user, mysql_password, mysql_database);
-
-	ev.run();
 };
 
-void QueryTest()
+void MariaDBTest::Open()
 {
-	async_connection connection;
-	ev.reset();
-	connection.open(ev, [&connection](const error& e) {
+	_connection.open(_service, [this](const error& e) {
 		if (e)
 		{
 			LogError(e);
-			ev.stop();
+			_service.stop();
 		}
 		else
 		{
 			printf("Connect to mysql ok.\n");
-			connection.query("select id, name, CreateTime, Company from test", 0, 
-				[](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
-				char szTime[128] = { 0 };
-				if (create_time.year != 0)
-				{
-					struct tm tm;
-					create_time.as_tm(tm);
-					strftime(szTime, 128, "%c", &tm);
-				}
-				printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
-			}, [&connection](const error& e) {
-				printf("query has completed.\n");
-				if (e)
-					LogError(e);
-
-				connection.close([]() {
-					printf("Close connection ok.\n");
-					ev.stop();
-				});
-			});
+			SimpleQuery();
 		}
 	}, mysql_server, mysql_user, mysql_password, mysql_database);
-
-	ev.run();
 }
 
-void MultiQueryTest()
+void MariaDBTest::Close()
 {
-	async_connection connection;
-	ev.reset();
-	connection.open(ev, [&connection](const error& e) {
+	_connection.close([this]() {
+		printf("Connection is closed.\n");
+		_service.stop();
+	});
+}
+
+void MariaDBTest::SimpleQuery()
+{
+	_connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
+		for (int i = 0; i != field_count; i++)
+			printf("%s\t", row[i]);
+		printf("\n");
+		return true;
+	}, [this](const error& e, size_t row_count) {
 		if (e)
 		{
 			LogError(e);
-			ev.stop();
 		}
 		else
 		{
-			printf("Connect to mysql ok.\n");
-			connection.query_multi("call test_proc",
-				[&connection](const error& e) {
-				if (e)
-					LogError(e);
-
-				connection.close([]() {
-					printf("Close connection ok.\n");
-					ev.stop();
-				});
-			}, [](uint32_t i, const std::string& str) {
-				printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
-			}, [](const qtl::mysql::time& time) {
-				struct tm tm;
-				time.as_tm(tm);
-				printf("current time is: %s\n", asctime(&tm));
-			});
+			printf("Total %lu rows.\n", row_count);
+			Execute();
 		}
-	}, mysql_server, mysql_user, mysql_password, mysql_database);
+	});
+}
 
-	ev.run();
+void MariaDBTest::Execute()
+{
+	_connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
+		for (int i = 0; i != field_count; i++)
+			printf("%s\t", row[i]);
+		printf("\n");
+		return true;
+	}, [this](const error& e, size_t row_count) {
+		if (e)
+			LogError(e);
+		else
+		{
+			printf("Total %lu rows.\n", row_count);
+			Query();
+		}
+	});
+}
+
+void MariaDBTest::Query()
+{
+	_connection.query("select id, name, CreateTime, Company from test", 0, 
+		[](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
+			char szTime[128] = { 0 };
+			if (create_time.year != 0)
+			{
+				struct tm tm;
+				create_time.as_tm(tm);
+				strftime(szTime, 128, "%c", &tm);
+			}
+			printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
+	}, [this](const error& e) {
+		printf("query has completed.\n");
+		if (e)
+			LogError(e);
+		else
+			MultiQuery();
+	});
+}
+
+void MariaDBTest::MultiQuery()
+{
+	_connection.query_multi("call test_proc",
+		[this](const error& e) {
+			if (e)
+				LogError(e);
+			else
+				Close();
+			
+	}, [](uint32_t i, const std::string& str) {
+		printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
+	}, [](const qtl::mysql::time& time) {
+		struct tm tm;
+		time.as_tm(tm);
+		printf("current time is: %s\n", asctime(&tm));
+	});
 }
 
 int main(int argc, char* argv[])
 {
-	ExecuteTest();
-	SimpleTest();
-	QueryTest();
-	MultiQueryTest();
+	MariaDBTest test;
 	return 0;
 }

--
Gitblit v1.9.3