Search code examples
pythonzeromqgunicornpython-asyncioaiohttp

Listen to ZeroMQ in aiohttp application process


I run aiohttp application with Gunicorn behind nginx. In my application's initialization module I don't run the application using web.run_app(app) but just create an instance that will be imported by Gunicorn to run it in each worker Gunicorn creates. So Gunicorn creates a few worker processes, event loops within them, and then runs the application's request handler in those loops.

My aiohttp application has a collection of connected WebSockets (mobile application clients) that I want to notify on event occurred in any of application processes started by Gunicorn. And I want to notify all WebSockets that are connected to all application processes. Therefore I create some kind of upstream proxy using ZeroMQ and I want to subscribe to it using zmq.SUB socket from each application process.

...So basically I want to achieve something like this in each application worker:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
    event = socket.recv()
    for ws in app['websockets']:
        ws.send_bytes(event)
    # break before app shutdown. How?

How can I listen the ZeroMQ proxy within aiohttp application to forward messages to WebSockets?

Where can I put this code to run in background within event loop and how to run and shutdown it correctly within aiohttp application's life cycle?


UPDATE

I've already created an issue in aiohttp's GitHub repository describing the problem and proposing a possible solution. I'd highly appreciate an input here or there on matter of the problem described.


Solution

  • Ok, the question and the discussion on this issue has led to the new functionality I've contributed to aiohttp, namely in version 1.0 we'll have an ability to register on_startup application signals using Application.on_startup() method.

    Documentation.
    Working example on the master branch.

    #!/usr/bin/env python3
    """Example of aiohttp.web.Application.on_startup signal handler"""
    import asyncio
    
    import aioredis
    from aiohttp.web import Application, WebSocketResponse, run_app
    
    async def websocket_handler(request):
        ws = WebSocketResponse()
        await ws.prepare(request)
        request.app['websockets'].append(ws)
        try:
            async for msg in ws:
                print(msg)
                await asyncio.sleep(1)
        finally:
            request.app['websockets'].remove(ws)
        return ws
    
    
    async def on_shutdown(app):
        for ws in app['websockets']:
            await ws.close(code=999, message='Server shutdown')
    
    
    async def listen_to_redis(app):
        try:
            sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
            ch, *_ = await sub.subscribe('news')
            async for msg in ch.iter(encoding='utf-8'):
                # Forward message to all connected websockets:
                for ws in app['websockets']:
                    ws.send_str('{}: {}'.format(ch.name, msg))
                print("message in {}: {}".format(ch.name, msg))
        except asyncio.CancelledError:
            pass
        finally:
            print('Cancel Redis listener: close connection...')
            await sub.unsubscribe(ch.name)
            await sub.quit()
            print('Redis connection closed.')
    
    
    async def start_background_tasks(app):
        app['redis_listener'] = app.loop.create_task(listen_to_redis(app))
    
    
    async def cleanup_background_tasks(app):
        print('cleanup background tasks...')
        app['redis_listener'].cancel()
        await app['redis_listener']
    
    
    async def init(loop):
        app = Application(loop=loop)
        app['websockets'] = []
        app.router.add_get('/news', websocket_handler)
        app.on_startup.append(start_background_tasks)
        app.on_cleanup.append(cleanup_background_tasks)
        app.on_shutdown.append(on_shutdown)
        return app
    
    loop = asyncio.get_event_loop()
    app = loop.run_until_complete(init(loop))
    run_app(app)