From 0d3e994e33aa85965616a064bbf635ec8d043c1a Mon Sep 17 00:00:00 2001
From: znone <glyc@sina.com.cn>
Date: Fri, 02 Oct 2020 06:11:48 +0000
Subject: [PATCH] The database can be operated asynchronously through asio.
---
test/TestMariaDB.cpp | 462 ++++++++++++---------------------------------------------
1 files changed, 98 insertions(+), 364 deletions(-)
diff --git a/test/TestMariaDB.cpp b/test/TestMariaDB.cpp
index 8435e6b..c78eda6 100644
--- a/test/TestMariaDB.cpp
+++ b/test/TestMariaDB.cpp
@@ -6,260 +6,11 @@
#include <time.h>
#include <limits.h>
#include "../include/qtl_mysql_pool.hpp"
+#include "../include/qtl_asio.hpp"
#if MARIADB_VERSION_ID < 0100000
#error "The program need mariadb version > 10.0"
#endif
-
-class simple_event_loop
-{
-public:
- simple_event_loop()
- {
- m_expired = 0;
- m_maxfd = 0;
- m_stoped = false;
- FD_ZERO(&m_readset);
- FD_ZERO(&m_writeset);
- FD_ZERO(&m_exceptset);
- }
-
- void reset()
- {
- m_stoped = false;
- }
-
- void run()
- {
- while (!m_stoped)
- do_once(nullptr);
- }
-
- void run_for(long seconds)
- {
- time_t expired, now;
- time(&now);
- expired = now + seconds;
- timeval tv = { seconds };
- do
- {
- do_once(&tv);
- time(&now);
- tv.tv_sec = expired - now;
- tv.tv_usec = 0;
- } while (!m_stoped && now < expired);
- }
-
- void stop()
- {
- m_stoped = true;
- }
-
-private:
- class event_item : public qtl::event
- {
- public:
- event_item(simple_event_loop* loop, qtl::socket_type fd)
- : m_loop(loop), m_fd(fd), m_expired(LONG_MAX)
- {
- }
-
- virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&& handler) NOEXCEPT override
- {
- if (timeout > 0)
- {
- time_t now;
- time(&now);
- if (m_expired > now + timeout)
- m_expired = now + timeout;
- m_loop->set(this, flags, m_expired);
- }
- else
- {
- m_expired = LONG_MAX;
- m_loop->set(this, flags, 0);
- }
- m_io_handler = std::forward<std::function<void(int)>>(handler);
- }
- virtual void remove() override
- {
- m_loop->remove(this);
- }
- virtual bool is_busying() override
- {
- return m_io_handler!=nullptr;
- }
-
- simple_event_loop* m_loop;
- qtl::socket_type m_fd;
- time_t m_expired;
- std::function<void(int)> m_io_handler;
- };
- std::vector<std::unique_ptr<event_item>> m_events;
-
- //implement for QTL
-public:
- template<typename Connection>
- event_item* add(Connection* connection)
- {
- qtl::socket_type fd = connection->socket();
- event_item* result = new event_item(this, fd);
- std::unique_ptr<event_item> item(result);
- m_events.emplace_back(std::move(item));
- return result;
- }
-
- template<typename Handler>
- event_item* set_timeout(const timeval& timeout, Handler&& handler)
- {
- event_item* result = new event_item(this, 0);
- std::unique_ptr<event_item> item(result);
- ::time(&result->m_expired);
- result->m_expired+=timeout.tv_sec;
- if(m_expired>result->m_expired)
- m_expired=result->m_expired;
- m_events.emplace_back(std::move(item));
- return result;
- }
-
-public:
- void set(event_item* item, int flags, time_t expired)
- {
- if (item->m_fd + 1 > m_maxfd)
- m_maxfd = item->m_fd + 1;
- if (flags&qtl::event::ef_read)
- FD_SET(item->m_fd, &m_readset);
- if (flags&qtl::event::ef_write)
- FD_SET(item->m_fd, &m_writeset);
- if (flags&qtl::event::ef_exception)
- FD_SET(item->m_fd, &m_exceptset);
- if (expired > 0 && m_expired > expired)
- m_expired = expired;
- }
-
- void remove(event_item* item)
- {
- auto it = m_events.begin();
- while (it != m_events.end())
- {
- if (it->get() == item)
- {
- m_events.erase(it);
- return;
- }
- ++it;
- }
- }
-
-private:
- fd_set m_readset, m_writeset, m_exceptset;
- qtl::socket_type m_maxfd;
- int do_once(timeval* timeout)
- {
- fd_set rs = m_readset, ws = m_writeset, es = m_exceptset;
- timeval tv = { INT_MAX, 0 };
- time_t now;
- time(&now);
- if (timeout)
- {
- tv = *timeout;
- timeout = &tv;
- }
-
- long d = m_expired - now;
- if (d > 0 && tv.tv_sec > d)
- {
- tv.tv_sec = d;
- timeout = &tv;
- }
-
- qtl::socket_type maxfd = m_maxfd;
- FD_ZERO(&m_readset);
- FD_ZERO(&m_writeset);
- FD_ZERO(&m_exceptset);
- m_maxfd = 0;
-
- if (maxfd == 0)
- {
- if (timeout)
- {
- std::this_thread::sleep_for(std::chrono::microseconds(timeout->tv_sec*std::micro::den + timeout->tv_usec));
- check_timeout();
- }
- else
- {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- return 0;
- }
- else
- {
- int ret = select(maxfd, &rs, &ws, &es, timeout);
- if (ret > 0)
- {
- for (auto& item : m_events)
- {
- int flags = 0;
- if (FD_ISSET(item->m_fd, &rs))
- flags |= qtl::event::ef_read;
- if (FD_ISSET(item->m_fd, &ws))
- flags |= qtl::event::ef_write;
- if (FD_ISSET(item->m_fd, &es))
- flags |= qtl::event::ef_exception;
- if (flags && item->m_io_handler)
- {
- auto handler = std::move(item->m_io_handler);
- handler(flags);
- }
- }
- }
- else if (ret == 0)
- {
- time(&now);
- for (auto& item : m_events)
- {
- if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
- {
- item->m_io_handler(qtl::event::ef_timeout);
- item->m_expired = 0;
- item->m_io_handler = nullptr;
- }
- }
- }
- else if (ret < 0)
- {
- int errc;
-#ifdef _WIN32
- errc = WSAGetLastError();
-#else
- errc = errno;
-#endif
- throw std::system_error(errc, std::system_category());
- }
- return ret;
- }
- }
-
- void check_timeout()
- {
- time_t now;
- time(&now);
- for (auto& item : m_events)
- {
- if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
- {
- item->m_io_handler(qtl::event::ef_timeout);
- item->m_expired = 0;
- item->m_io_handler = nullptr;
- }
- }
- }
-
- time_t m_expired;
- bool m_stoped;
-};
-
-simple_event_loop ev;
using namespace qtl::mysql;
@@ -270,152 +21,135 @@
const char mysql_server[]="localhost";
const char mysql_user[]="root";
-const char mysql_password[]="";
+const char mysql_password[]="123456";
const char mysql_database[]="test";
-void SimpleTest()
+class MariaDBTest
{
- async_connection connection;
- ev.reset();
- connection.open(ev, [&connection](const error& e) {
- if (e)
- {
- LogError(e);
- ev.stop();
- }
- else
- {
- printf("Connect to mysql ok.\n");
- connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
- for (int i = 0; i != field_count; i++)
- printf("%s\t", row[i]);
- printf("\n");
- return true;
- }, [&connection](const error& e, size_t row_count) {
- if (e)
- LogError(e);
- else
- printf("Total %lu rows.\n", row_count);
+public:
+ MariaDBTest()
+ {
+ Open();
+ _service.run();
+ }
- connection.close([]() {
- printf("Close connection ok.\n");
- ev.stop();
- });
- });
- }
- }, mysql_server, mysql_user, mysql_password, mysql_database);
+private:
+ void Open();
+ void Close();
+ void SimpleQuery();
+ void Execute();
+ void Query();
+ void MultiQuery();
- ev.run();
-}
+private:
+ async_connection _connection;
+ qtl::asio::service _service;
-void ExecuteTest()
-{
- async_connection connection;
- ev.reset();
- connection.open(ev, [&connection](const error& e) {
- if (e)
- {
- LogError(e);
- ev.stop();
- }
- else
- {
- printf("Connect to mysql ok.\n");
- connection.execute([&connection](const error& e, uint64_t affected) mutable {
- if (e)
- LogError(e);
- else
- printf("Insert %llu records ok.\n", affected);
- connection.close([]() {
- printf("Close connection ok.\n");
- ev.stop();
- });
- }, "insert into test(name, createtime, company) values(?, now(), ?)", 0, std::make_tuple("test name", "test company"));
- }
- }, mysql_server, mysql_user, mysql_password, mysql_database);
-
- ev.run();
};
-void QueryTest()
+void MariaDBTest::Open()
{
- async_connection connection;
- ev.reset();
- connection.open(ev, [&connection](const error& e) {
+ _connection.open(_service, [this](const error& e) {
if (e)
{
LogError(e);
- ev.stop();
+ _service.stop();
}
else
{
printf("Connect to mysql ok.\n");
- connection.query("select id, name, CreateTime, Company from test", 0,
- [](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
- char szTime[128] = { 0 };
- if (create_time.year != 0)
- {
- struct tm tm;
- create_time.as_tm(tm);
- strftime(szTime, 128, "%c", &tm);
- }
- printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
- }, [&connection](const error& e) {
- printf("query has completed.\n");
- if (e)
- LogError(e);
-
- connection.close([]() {
- printf("Close connection ok.\n");
- ev.stop();
- });
- });
+ SimpleQuery();
}
}, mysql_server, mysql_user, mysql_password, mysql_database);
-
- ev.run();
}
-void MultiQueryTest()
+void MariaDBTest::Close()
{
- async_connection connection;
- ev.reset();
- connection.open(ev, [&connection](const error& e) {
+ _connection.close([this]() {
+ printf("Connection is closed.\n");
+ _service.stop();
+ });
+}
+
+void MariaDBTest::SimpleQuery()
+{
+ _connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
+ for (int i = 0; i != field_count; i++)
+ printf("%s\t", row[i]);
+ printf("\n");
+ return true;
+ }, [this](const error& e, size_t row_count) {
if (e)
{
LogError(e);
- ev.stop();
}
else
{
- printf("Connect to mysql ok.\n");
- connection.query_multi("call test_proc",
- [&connection](const error& e) {
- if (e)
- LogError(e);
-
- connection.close([]() {
- printf("Close connection ok.\n");
- ev.stop();
- });
- }, [](uint32_t i, const std::string& str) {
- printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
- }, [](const qtl::mysql::time& time) {
- struct tm tm;
- time.as_tm(tm);
- printf("current time is: %s\n", asctime(&tm));
- });
+ printf("Total %lu rows.\n", row_count);
+ Execute();
}
- }, mysql_server, mysql_user, mysql_password, mysql_database);
+ });
+}
- ev.run();
+void MariaDBTest::Execute()
+{
+ _connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
+ for (int i = 0; i != field_count; i++)
+ printf("%s\t", row[i]);
+ printf("\n");
+ return true;
+ }, [this](const error& e, size_t row_count) {
+ if (e)
+ LogError(e);
+ else
+ {
+ printf("Total %lu rows.\n", row_count);
+ Query();
+ }
+ });
+}
+
+void MariaDBTest::Query()
+{
+ _connection.query("select id, name, CreateTime, Company from test", 0,
+ [](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
+ char szTime[128] = { 0 };
+ if (create_time.year != 0)
+ {
+ struct tm tm;
+ create_time.as_tm(tm);
+ strftime(szTime, 128, "%c", &tm);
+ }
+ printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
+ }, [this](const error& e) {
+ printf("query has completed.\n");
+ if (e)
+ LogError(e);
+ else
+ MultiQuery();
+ });
+}
+
+void MariaDBTest::MultiQuery()
+{
+ _connection.query_multi("call test_proc",
+ [this](const error& e) {
+ if (e)
+ LogError(e);
+ else
+ Close();
+
+ }, [](uint32_t i, const std::string& str) {
+ printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
+ }, [](const qtl::mysql::time& time) {
+ struct tm tm;
+ time.as_tm(tm);
+ printf("current time is: %s\n", asctime(&tm));
+ });
}
int main(int argc, char* argv[])
{
- ExecuteTest();
- SimpleTest();
- QueryTest();
- MultiQueryTest();
+ MariaDBTest test;
return 0;
}
--
Gitblit v1.9.3