Search code examples
pythonmysqldatabase-connectionconnection-poolingpymysql

Python mysql (using pymysql) auto reconnect


I'm not sure if this is possible, but I'm looking for a way to reconnect to mysql database when the connection is lost. All the connections are held in a gevent queue but that shouldn't matter I think. I'm sure if I put some time in, I can come up with a way to reconnect to the database. However I was glancing pymysql code and I saw that there is a 'ping' method in Connection class, which I'm not sure exactly how to use.

The method looks like it will reconnect first time but after that it switched the reconnect flag to False again? Can I use this method, or is there a different way to establish connection if it is lost? Even if it is not pymysql how do people tackle, database servers going down and having to re-establish connection to mysql server?

def ping(self, reconnect=True):
    ''' Check if the server is alive '''
    if self.socket is None:
        if reconnect:
            self._connect()
            reconnect = False
        else:
            raise Error("Already closed")
    try:
        self._execute_command(COM_PING, "")
        return self._read_ok_packet()
    except Exception:
        if reconnect:
            self._connect()
            return self.ping(False)
        else:
            raise

Solution

  • Finally got a working solution, might help someone.

    from gevent import monkey
    monkey.patch_socket()
    import logging
    
    import gevent
    from gevent.queue import Queue
    import pymysql as db
    
    logging.basicConfig(level=logging.DEBUG)
    LOGGER = logging.getLogger("connection_pool")
    
    
    class ConnectionPool:
        def __init__(self, db_config, time_to_sleep=30, test_run=False):
            self.username = db_config.get('user')
            self.password = db_config.get('password')
            self.host = db_config.get('host')
            self.port = int(db_config.get('port'))
            self.max_pool_size = 20
            self.test_run = test_run
            self.pool = None
            self.time_to_sleep = time_to_sleep
            self._initialize_pool()
    
        def get_initialized_connection_pool(self):
            return self.pool
    
        def _initialize_pool(self):
            self.pool = Queue(maxsize=self.max_pool_size)
            current_pool_size = self.pool.qsize()
            if current_pool_size < self.max_pool_size:  # this is a redundant check, can be removed
                for _ in xrange(0, self.max_pool_size - current_pool_size):
                    try:
                        conn = db.connect(host=self.host,
                                          user=self.username,
                                          passwd=self.password,
                                          port=self.port)
                        self.pool.put_nowait(conn)
    
                    except db.OperationalError, e:
                        LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
                        LOGGER.exception(e)
                        break
            self._check_for_connection_loss()
    
        def _re_initialize_pool(self):
            gevent.sleep(self.time_to_sleep)
            self._initialize_pool()
    
        def _check_for_connection_loss(self):
            while True:
                conn = None
                if self.pool.qsize() > 0:
                    conn = self.pool.get()
    
                if not self._ping(conn):
                    if self.test_run:
                        self.port = 3306
    
                    self._re_initialize_pool()
    
                else:
                    self.pool.put_nowait(conn)
    
                if self.test_run:
                    break
                gevent.sleep(self.time_to_sleep)
    
        def _ping(self, conn):
            try:
                if conn is None:
                    conn = db.connect(host=self.host,
                                      user=self.username,
                                      passwd=self.password,
                                      port=self.port)
                cursor = conn.cursor()
                cursor.execute('select 1;')
                LOGGER.debug(cursor.fetchall())
                return True
    
            except db.OperationalError, e:
                LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
                LOGGER.exception(e)
                return False
    
    # test (pytest compatible) -------------------------------------------------------------------------------------------
    import logging
    
    from src.py.ConnectionPool import ConnectionPool
    
    logging.basicConfig(level=logging.DEBUG)
    LOGGER = logging.getLogger("test_connection_pool")
    
    
    def test_get_initialized_connection_pool():
        config = {
            'user': 'root',
            'password': '',
            'host': '127.0.0.1',
            'port': 3305
        }
        conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
        pool = conn_pool.get_initialized_connection_pool()
        # when in test run the port will be switched back to 3306
        # so the queue size should be 20 - will be nice to work 
        # around this rather than test_run hack
        assert pool.qsize() == 20