Search code examples
mysqlpython-3.xconnection-poolingmysql-python

MySQL connection pooling in separate DB class - howto?


I'm writing an application where I've moved all the MySQL connection setup and teardown to a class, initializing within individual function calls with a With statement.

Now that the development is all done, I'm optimizing and would like to set up connection pooling - but I can't for the life of me figure out how - if I initialize the pool when I set up the object in enter, won't that set up a new pool for each object?

If I put the pool setup in the global of the module, then how do I ensure I set up the pool before I start creating DB objects?

My DB code looks somewhat like this:

# Setting up details for connecting to a local MariaDB/MySQL instance
# replace with suitable code/module when porting to cloud/production
import sys
import mysql.connector
"""Module for abstracting database connectivity 
Import this module and then call run_query(), run_query_vals() or run_query_no_return()    """

__all__ = ['UseDatabase', 'CredentialsError', 'ConnectionError', 'SQLError']

class ConnectionError(Exception):
    pass


class CredentialsError(Exception):
    pass

class SQLError(Exception):
    pass

dbconfig = { 'host': '127.0.0.1', 'user' : 'statdev', 'password' : 'statdev', 'database': 'stat',}
# Just so we remember. This also doubles as default server details while doing unit testing.

class UseDatabase:
#    myconfig = {'host': '127.0.0.1', 'user': 'statdev', 'password': 'statdev', 'database': 'stat', }
    config = None

    def __init__(self, config: dict):
        self.config = config

    def __enter__(self) -> 'self':
        try:
            self.conn = mysql.connector.connect(**self.config)
            self.cursor = self.conn.cursor(dictionary=True)
            return self
        except mysql.connector.InterfaceError as err:
            print('Can\'t connect to Database - is it available? \nError: ', str(err))
            raise ConnectionError(err)
        except mysql.connector.ProgrammingError as err:
            print('Invalid credentials - please check ID/Password. \nError: ', str(err))
            raise CredentialsError(err)
        except mysql.connector.IntegrityError as err:
            print("Error: {}".format(err))

        except Exception as err:
            print('Something else went wrong:', str(err))
            return err


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.conn.commit()
        self.cursor.close()
        self.conn.close()
        if exc_type is mysql.connector.errors.ProgrammingError:
            print('Error in SQL Code - please check the query. \nError: ', str(exc_type))
            raise SQLError(exc_value)
        elif exc_type:
            print('Something else went wrong\n', str(exc_type))
            raise exc_type(exc_value)

    def run_query(self,query_str) -> 'cursor':
        """query function that takes """
        self.cursor.execute(query_str, None)
        return self.cursor

    def run_query_vals(self, query_str, tupleval) -> 'cursor':
#        print("\n\n %s " % query_str)
        self.cursor.execute(query_str, tupleval)
        return self.cursor

    def run_query_no_return(self,query_str) -> 'cursor':
        """query function that takes """
        self.cursor.execute(query_str)
        return self.cursor

def test():
#    dbconfig = {'host': '127.0.0.1', 'user': 'statdev', 'password': 'statdev', 'database': 'stat', }
    with UseDatabase(dbconfig) as db:
#        result = db.run_query("Select NULL from dual")
        result = db.run_query_vals('Select NULL from dual', None)
        res = result.fetchone()
        if res == {'NULL': None}:
            print("DB Module Test was successful! \n"
                  "Queries return values in dictionaries."
                  "\nTest query \'Select NULL from dual\' returned result: %s" % str(res))


if __name__ == '__main__':
    test()

Solution

  • This has worked for me but I am not sure it's a perfect solution as, for example, trying to do multiple inserts via a for loop results in a 'Failed getting connection; pool exhausted' error. I did not have this problem when I was using a function-based (non class-based) connection pool. Anyway, to avoid this problem I just simply use 'cursor.executemany' in one go.

    Hope this helps someone!

    from mysql.connector.pooling import MySQLConnectionPool
    from mysql.connector.errors import ProgrammingError, InterfaceError
    from settings import config
    
    
    # Database connection pool
    dbconfig = config.dbconfig
    dbconfig_pool = config.dbconfig_pool
    
    #The following is my 'class DBasePool' content:
    def __init__(self, dbconfig, dbconfig_pool):
        self.dbconfig = dbconfig
        self.pool_name = dbconfig_pool['pool_name']
        self.pool_size = dbconfig_pool['pool_size']
        try:
            self.cnxpool = self.create_pool(pool_name=self.pool_name, pool_size=self.pool_size)
            self.cnx = self.cnxpool.get_connection()
            self.cursor = self.cnx.cursor(buffered=True)
    
        except InterfaceError as e:
            logger.error(e)
            raise ConnectionError(e)
        except ProgrammingError as e:
            logger.error(e)
            raise CredentialsError(e)
        except Exception as e:
            logger.error(e)
            raise
    
    def create_pool(self, pool_name, pool_size):
        return MySQLConnectionPool(pool_name=pool_name, pool_size= pool_size, **self.dbconfig)
    
    def close(self, cnx, cursor):
        cursor.close()
        cnx.close()
    
    def execute(self, sql, data=None):
        # Get connection form connection pool instead of creating one
        cnx = self.cnxpool.get_connection()
        cursor = cnx.cursor(buffered=True)
    
        cursor.execute(sql, data)
    
        if cursor.rowcount:
            cnx.commit()
            rowcount = cursor.rowcount
            self.close(cnx, cursor)
            return rowcount
        else:
            print('Could not insert record(s): {}, {}'.format(sql, data))
            return 0