Search code examples
pythonpostgresqltwistedpsycopg2

Sharing a txpostgres connection pool


We have a RESTful(-ish) twisted application that uses txpostgres to access a postgres db. Currently, we generate new txpostgres.Connection instances every time a client pings the server for a db call. This is inefficient and results in our db quickly getting overwhelmed. I've been trying to adapt this to use txpostgres.ConnectionPool instead, but am running into trouble. Right now I have something that looks like this:

class DBTester(object):
    def __init__(self):
        self.cfg = load_config('local')  # load the db settings from a JSON file
        self.pool = ConnectionPool(None, min=1, **self.cfg) # create the pool

    @defer.inlineCallbacks
    def get_pool(self):
        yield self.pool.start()
        defer.returnValue(self.pool)


class DBT(object):
    def __init__(self):
        self.db = DBTester()

    @defer.inlineCallbacks
    def t(self):
        conn = yield self.db.get_pool()
        res = yield conn.runQuery('select * from clients')
        println('DBT.t result: {}'.format(res))


if __name__ == "__main__":
    dbt = DBT()
    dbt.t()
    dbt.t()

    reactor.run()

The issue is the timing of the pool.start() call. If I put it in DBTester.__init__, I get psycopg2.OperationalError: asynchronous connection attempt underway. If I put it in DBTester.get_pool, one db.t() call works, and the other(s) fail with exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery'. I've been struggling with this basically all day, and haven't been able to crack it, nor have I been able to find much online.

I really just need a pointer to some minimal example of how ConnectionPool is used. Any suggestions?


Solution

  • I don't know if this counts as best practices, but here's what we're going with:

    ## dbutil.py
    class DBConnection(object):
        def __init__(self, cfg_name):
            self.cfg_name = cfg_name
            self.cfg = self.load_config(self.cfg_name)
            self.pool = txpostgres.ConnectionPool(None, min=5, **self.cfg)
    
        @staticmethod
        def load_config(name):
            with open('config.json') as json_file:
                cfg = json.load(json_file)
            return cfg.get(name)
    
        @defer.inlineCallbacks
        def get_pool(self):
            try:
                yield self.pool.start()
            except txpostgres.AlreadyConnected:
                pass
            defer.returnValue(self.pool)
    
    
    @defer.inlineCallbacks
    def db_factory(cfg_name):
        db = DBConnection(cfg_name)
        db.pool = yield db.get_pool()
        defer.returnValue(db)
    
    
    ## basehandler.py
    def __init__(self, name=None, db=None):
        resource.Resource.__init__(self)
        self.name = name
        self.db = db
        self.pool = self.db.pool
    
    @defer.inlineCallbacks
    def runQuery(self, *args, **kwargs):
        res = yield self.pool.runQuery(*args, **kwargs)
        defer.returnValue(res)
    
    
    ## server.py
    @defer.inlineCallbacks
    def init_site(db):
        db = yield db_factory(db)
        root = RootURLHandler(db)
        reactor.listenTCP(SERVER_PORT, site)
    
    def main(db):
        log.startLogging(LogFile('server.log', '.', maxRotatedFiles=5))
        init_site(db)
        reactor.run()
    

    The key, perhaps unsurprisingly, was making site initialization a deferred contingent upon the db stuff going through.