Search code examples
pythonpostgresqlasyncpg

asyncpg add listener to Pool


Good afternoon.

I'm trying to use the NOTIFY/LISTEN feature in PostgreSQL, as seen in the asyncpg docs we can add a listener to a Connection object, but not a Pool, I've tried the solutions shown in this issue, code below:

def listener(*args):
    print("ANYTHING")

async def main():
    creds = {}

    async def add_listeners(conn) -> None:
        await conn.add_listener("listener_channel", listener)

    pool = await asyncpg.create_pool(**creds, setup=add_listeners)
    await asyncio.sleep(10000)


asyncio.run(main())

And then running

NOTIFY listener_channel

In PgAdmin4.

However - nothing happens. How could I make it work?


Solution

  • Alright so it seems that it's not working cause all of the connections are idle, I came up with this solution

    import asyncio
    import asyncpg
    
    
    class ListenerConnection(asyncpg.Connection):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self._listeners_futures: dict[str: list[asyncio.Future] = {}
            
        def _dummy_callback(self, channel):
            def wrapper(*args):
                if channel not in self._listeners_futures:
                    return
                for fut in self._listeners_futures[channel]:
                    fut.set_result(None)
                self._listeners_futures[channel].remove(fut)
            return wrapper
    
        async def add_one_time_listener(self, channel):
            callback = self._dummy_callback(channel)
            await self.add_listener(channel, callback)
    
        async def listen(self, channel):
            await self.add_one_time_listener(channel)
            future = self._loop.create_future()
            
            if channel not in self._listeners_futures:
                self._listeners_futures[channel] = []
            self._listeners_futures[channel].append(future)
      
            return await future
    
    
    
    async def main():
        pool = await asyncpg.create_pool(**creds, connection_class=ListenerConnection)
        async with pool.acquire() as conn:
            await conn.listen("some_channel")
            print("ANYTHING")
    
    
    asyncio.run(main())