znone
2020-10-02 0d3e994e33aa85965616a064bbf635ec8d043c1a
test/TestMariaDB.cpp
@@ -6,260 +6,11 @@
#include <time.h>
#include <limits.h>
#include "../include/qtl_mysql_pool.hpp"
#include "../include/qtl_asio.hpp"
#if MARIADB_VERSION_ID < 0100000
#error "The program need mariadb version > 10.0"
#endif 
class simple_event_loop
{
public:
   simple_event_loop()
   {
      m_expired = 0;
      m_maxfd = 0;
      m_stoped = false;
      FD_ZERO(&m_readset);
      FD_ZERO(&m_writeset);
      FD_ZERO(&m_exceptset);
   }
   void reset()
   {
      m_stoped = false;
   }
   void run()
   {
      while (!m_stoped)
         do_once(nullptr);
   }
   void run_for(long seconds)
   {
      time_t expired, now;
      time(&now);
      expired = now + seconds;
      timeval tv = { seconds };
      do
      {
         do_once(&tv);
         time(&now);
         tv.tv_sec = expired - now;
         tv.tv_usec = 0;
      } while (!m_stoped && now < expired);
   }
   void stop()
   {
      m_stoped = true;
   }
private:
   class event_item : public qtl::event
   {
   public:
      event_item(simple_event_loop* loop, qtl::socket_type fd)
         : m_loop(loop), m_fd(fd), m_expired(LONG_MAX)
      {
      }
      virtual void set_io_handler(int flags, long timeout, std::function<void(int)>&& handler) NOEXCEPT override
      {
         if (timeout > 0)
         {
            time_t now;
            time(&now);
            if (m_expired > now + timeout)
               m_expired = now + timeout;
            m_loop->set(this, flags, m_expired);
         }
         else
         {
            m_expired = LONG_MAX;
            m_loop->set(this, flags, 0);
         }
         m_io_handler = std::forward<std::function<void(int)>>(handler);
      }
      virtual void remove() override
      {
         m_loop->remove(this);
      }
      virtual bool is_busying() override
      {
         return m_io_handler!=nullptr;
      }
      simple_event_loop* m_loop;
      qtl::socket_type m_fd;
      time_t m_expired;
      std::function<void(int)> m_io_handler;
   };
   std::vector<std::unique_ptr<event_item>> m_events;
   //implement for QTL
public:
   template<typename Connection>
   event_item* add(Connection* connection)
   {
      qtl::socket_type fd = connection->socket();
      event_item* result = new event_item(this, fd);
      std::unique_ptr<event_item> item(result);
      m_events.emplace_back(std::move(item));
      return result;
   }
   template<typename Handler>
   event_item* set_timeout(const timeval& timeout, Handler&& handler)
   {
      event_item* result = new event_item(this, 0);
      std::unique_ptr<event_item> item(result);
      ::time(&result->m_expired);
      result->m_expired+=timeout.tv_sec;
      if(m_expired>result->m_expired)
         m_expired=result->m_expired;
      m_events.emplace_back(std::move(item));
      return result;
   }
public:
   void set(event_item* item, int flags, time_t expired)
   {
      if (item->m_fd + 1 > m_maxfd)
         m_maxfd = item->m_fd + 1;
      if (flags&qtl::event::ef_read)
         FD_SET(item->m_fd, &m_readset);
      if (flags&qtl::event::ef_write)
         FD_SET(item->m_fd, &m_writeset);
      if (flags&qtl::event::ef_exception)
         FD_SET(item->m_fd, &m_exceptset);
      if (expired > 0 && m_expired > expired)
         m_expired = expired;
   }
   void remove(event_item* item)
   {
      auto it = m_events.begin();
      while (it != m_events.end())
      {
         if (it->get() == item)
         {
            m_events.erase(it);
            return;
         }
         ++it;
      }
   }
private:
   fd_set m_readset, m_writeset, m_exceptset;
   qtl::socket_type m_maxfd;
   int do_once(timeval* timeout)
   {
      fd_set rs = m_readset, ws = m_writeset, es = m_exceptset;
      timeval tv = { INT_MAX, 0 };
      time_t now;
      time(&now);
      if (timeout)
      {
         tv = *timeout;
         timeout = &tv;
      }
      long d = m_expired - now;
      if (d > 0 && tv.tv_sec > d)
      {
         tv.tv_sec = d;
         timeout = &tv;
      }
      qtl::socket_type maxfd = m_maxfd;
      FD_ZERO(&m_readset);
      FD_ZERO(&m_writeset);
      FD_ZERO(&m_exceptset);
      m_maxfd = 0;
      if (maxfd == 0)
      {
         if (timeout)
         {
            std::this_thread::sleep_for(std::chrono::microseconds(timeout->tv_sec*std::micro::den + timeout->tv_usec));
            check_timeout();
         }
         else
         {
            std::this_thread::sleep_for(std::chrono::seconds(1));
         }
         return 0;
      }
      else
      {
         int ret = select(maxfd, &rs, &ws, &es, timeout);
         if (ret > 0)
         {
            for (auto& item : m_events)
            {
               int flags = 0;
               if (FD_ISSET(item->m_fd, &rs))
                  flags |= qtl::event::ef_read;
               if (FD_ISSET(item->m_fd, &ws))
                  flags |= qtl::event::ef_write;
               if (FD_ISSET(item->m_fd, &es))
                  flags |= qtl::event::ef_exception;
               if (flags && item->m_io_handler)
               {
                  auto handler = std::move(item->m_io_handler);
                  handler(flags);
               }
            }
         }
         else if (ret == 0)
         {
            time(&now);
            for (auto& item : m_events)
            {
               if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
               {
                  item->m_io_handler(qtl::event::ef_timeout);
                  item->m_expired = 0;
                  item->m_io_handler = nullptr;
               }
            }
         }
         else if (ret < 0)
         {
            int errc;
#ifdef _WIN32
            errc = WSAGetLastError();
#else
            errc = errno;
#endif
            throw std::system_error(errc, std::system_category());
         }
         return ret;
      }
   }
   void check_timeout()
   {
      time_t now;
      time(&now);
      for (auto& item : m_events)
      {
         if (item->m_expired > 0 && item->m_io_handler && item->m_expired < now)
         {
            item->m_io_handler(qtl::event::ef_timeout);
            item->m_expired = 0;
            item->m_io_handler = nullptr;
         }
      }
   }
   time_t m_expired;
   bool m_stoped;
};
simple_event_loop ev;
using namespace qtl::mysql;
@@ -270,152 +21,135 @@
const char mysql_server[]="localhost";
const char mysql_user[]="root";
const char mysql_password[]="";
const char mysql_password[]="123456";
const char mysql_database[]="test";
void SimpleTest()
class MariaDBTest
{
   async_connection connection;
   ev.reset();
   connection.open(ev, [&connection](const error& e) {
      if (e)
      {
         LogError(e);
         ev.stop();
      }
      else
      {
         printf("Connect to mysql ok.\n");
         connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
            for (int i = 0; i != field_count; i++)
               printf("%s\t", row[i]);
            printf("\n");
            return true;
         }, [&connection](const error& e, size_t row_count) {
            if (e)
               LogError(e);
            else
               printf("Total %lu rows.\n", row_count);
public:
   MariaDBTest()
   {
      Open();
      _service.run();
   }
            connection.close([]() {
               printf("Close connection ok.\n");
               ev.stop();
            });
         });
      }
   }, mysql_server, mysql_user, mysql_password, mysql_database);
private:
   void Open();
   void Close();
   void SimpleQuery();
   void Execute();
   void Query();
   void MultiQuery();
   ev.run();
}
private:
   async_connection _connection;
   qtl::asio::service _service;
void ExecuteTest()
{
   async_connection connection;
   ev.reset();
   connection.open(ev, [&connection](const error& e) {
      if (e)
      {
         LogError(e);
         ev.stop();
      }
      else
      {
         printf("Connect to mysql ok.\n");
         connection.execute([&connection](const error& e, uint64_t affected) mutable {
               if (e)
                  LogError(e);
               else
                  printf("Insert %llu records ok.\n", affected);
               connection.close([]() {
                  printf("Close connection ok.\n");
                  ev.stop();
               });
         }, "insert into test(name, createtime, company) values(?, now(), ?)", 0, std::make_tuple("test name", "test company"));
      }
   }, mysql_server, mysql_user, mysql_password, mysql_database);
   ev.run();
};
void QueryTest()
void MariaDBTest::Open()
{
   async_connection connection;
   ev.reset();
   connection.open(ev, [&connection](const error& e) {
   _connection.open(_service, [this](const error& e) {
      if (e)
      {
         LogError(e);
         ev.stop();
         _service.stop();
      }
      else
      {
         printf("Connect to mysql ok.\n");
         connection.query("select id, name, CreateTime, Company from test", 0,
            [](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
            char szTime[128] = { 0 };
            if (create_time.year != 0)
            {
               struct tm tm;
               create_time.as_tm(tm);
               strftime(szTime, 128, "%c", &tm);
            }
            printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
         }, [&connection](const error& e) {
            printf("query has completed.\n");
            if (e)
               LogError(e);
            connection.close([]() {
               printf("Close connection ok.\n");
               ev.stop();
            });
         });
         SimpleQuery();
      }
   }, mysql_server, mysql_user, mysql_password, mysql_database);
   ev.run();
}
void MultiQueryTest()
void MariaDBTest::Close()
{
   async_connection connection;
   ev.reset();
   connection.open(ev, [&connection](const error& e) {
   _connection.close([this]() {
      printf("Connection is closed.\n");
      _service.stop();
   });
}
void MariaDBTest::SimpleQuery()
{
   _connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
      for (int i = 0; i != field_count; i++)
         printf("%s\t", row[i]);
      printf("\n");
      return true;
   }, [this](const error& e, size_t row_count) {
      if (e)
      {
         LogError(e);
         ev.stop();
      }
      else
      {
         printf("Connect to mysql ok.\n");
         connection.query_multi("call test_proc",
            [&connection](const error& e) {
            if (e)
               LogError(e);
            connection.close([]() {
               printf("Close connection ok.\n");
               ev.stop();
            });
         }, [](uint32_t i, const std::string& str) {
            printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
         }, [](const qtl::mysql::time& time) {
            struct tm tm;
            time.as_tm(tm);
            printf("current time is: %s\n", asctime(&tm));
         });
         printf("Total %lu rows.\n", row_count);
         Execute();
      }
   }, mysql_server, mysql_user, mysql_password, mysql_database);
   });
}
   ev.run();
void MariaDBTest::Execute()
{
   _connection.simple_query("select * from test", 0, [](MYSQL_ROW row, int field_count) {
      for (int i = 0; i != field_count; i++)
         printf("%s\t", row[i]);
      printf("\n");
      return true;
   }, [this](const error& e, size_t row_count) {
      if (e)
         LogError(e);
      else
      {
         printf("Total %lu rows.\n", row_count);
         Query();
      }
   });
}
void MariaDBTest::Query()
{
   _connection.query("select id, name, CreateTime, Company from test", 0,
      [](int64_t id, const std::string& name, const qtl::mysql::time& create_time, const std::string& company) {
         char szTime[128] = { 0 };
         if (create_time.year != 0)
         {
            struct tm tm;
            create_time.as_tm(tm);
            strftime(szTime, 128, "%c", &tm);
         }
         printf("%lld\t%s\t%s\t%s\n", id, name.data(), szTime, company.data());
   }, [this](const error& e) {
      printf("query has completed.\n");
      if (e)
         LogError(e);
      else
         MultiQuery();
   });
}
void MariaDBTest::MultiQuery()
{
   _connection.query_multi("call test_proc",
      [this](const error& e) {
         if (e)
            LogError(e);
         else
            Close();
   }, [](uint32_t i, const std::string& str) {
      printf("0=\"%d\", 'hello world'=\"%s\"\n", i, str.data());
   }, [](const qtl::mysql::time& time) {
      struct tm tm;
      time.as_tm(tm);
      printf("current time is: %s\n", asctime(&tm));
   });
}
int main(int argc, char* argv[])
{
   ExecuteTest();
   SimpleTest();
   QueryTest();
   MultiQueryTest();
   MariaDBTest test;
   return 0;
}