znone
2019-06-16 ebd997c5ce55fd260696f9bb117f1f06c09a4759
include/qtl_database_pool.hpp
@@ -137,6 +137,7 @@
         return;
      m_trying_connection=true;
      try
      {
         m_background_thread=std::thread(&database_pool<Database>::background_connect, this);
@@ -175,6 +176,170 @@
   }
};
template<typename T, typename EventLoop, typename Connection>
class async_pool
{
public:
   typedef Connection value_type;
   typedef std::shared_ptr<Connection> pointer;
   async_pool(EventLoop& ev)
      : m_ev(ev), m_trying_connecting(false)
   {
   }
   virtual ~async_pool()
   {
      clear();
   }
   /*
      Handler defines as:
      void handler(const pointer& ptr);
   */
   template<typename Handler>
   void get(Handler&& handler, EventLoop* ev=nullptr)
   {
      Connection* db = popup();
      if(ev==nullptr) ev=&m_ev;
      if(db)
      {
         db->bind(*ev);
         handler(typename Connection::exception_type(), wrap(db));
      }
      else if (m_trying_connecting == false)
      {
         create_connection(ev, [this, handler](const typename Connection::exception_type& e,  Connection* db) {
            handler(e, wrap(db));
         });
      }
      else
      {
         handler(typename Connection::exception_type(), nullptr);
      }
   }
   void test_alive()
   {
      if (m_connections.empty())
         return false;
      std::unique_lock<std::mutex> lock(m_pool_mutex);
      auto it = m_connections.begin();
      while (it != m_connections.end())
      {
         Connection* db = *it;
         db->is_alive([this, db](const typename Connection::exception_type& e) {
            if (e)
            {
               std::unique_lock<std::mutex> lock(m_pool_mutex);
               auto it = std::find(m_connections.begin(), m_connections.end(), db);
               delete db;
               m_connections.erase(it);
               if (m_connections.empty())
                  try_connect();
            }
         });
         ++it;
      }
   }
private:
   EventLoop& m_ev;
   std::vector<Connection*> m_connections;
   std::recursive_mutex m_pool_mutex;
   std::atomic<bool> m_trying_connecting;
   void recovery(Connection* db)
   {
      if (db == NULL) return;
      db->is_alive([this, db](const typename Connection::exception_type& e) {
         if (e)
         {
            {
               std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
               clear();
            }
            try_connect();
         }
         else
         {
            if(!db->unbind())
               throw std::runtime_error("destroy a busysing connection.");
            std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
            m_connections.push_back(db);
         }
      });
   }
   template<typename Handler>
   void create_connection(EventLoop* ev, Handler&& handler)
   {
      T* pThis = static_cast<T*>(this);
      pThis->new_connection(*ev, [this, handler](const typename Connection::exception_type& e, Connection* db) {
         handler(e, db);
         if (!db)
         {
            std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
            clear();
            timeval tv = { 1,  0};
            m_ev.set_timeout(tv, [this]() {
               try_connect();
            });
         }
      });
   }
   Connection* popup()
   {
      Connection* db = nullptr;
      std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
      if (!m_connections.empty())
      {
         db = m_connections.back();
         m_connections.pop_back();
      }
      return db;
   }
   void try_connect()
   {
      if (m_trying_connecting)
         return;
      m_trying_connecting = true;
      create_connection(&m_ev, [this](const typename Connection::exception_type& e, Connection* db) {
         if (db)
         {
            std::lock_guard<std::recursive_mutex> lock(m_pool_mutex);
            m_connections.push_back(db);
         }
         else
         {
            m_trying_connecting = false;
         }
      });
   }
   void clear()
   {
      std::for_each(m_connections.begin(), m_connections.end(), std::default_delete<Connection>());
      m_connections.clear();
   }
   pointer wrap(Connection* db)
   {
      if (db)
      {
         return pointer(db, [this](Connection* db) {
            recovery(db);
         });
      }
      else return nullptr;
   }
};
}
#endif //_QTL_DATABASE_POOL_H_