I run aiohttp
application with Gunicorn
behind nginx
.
In my application's initialization module I don't run the application using web.run_app(app)
but just create an instance that will be imported by Gunicorn
to run it in each worker Gunicorn
creates.
So Gunicorn
creates a few worker processes, event loops within them, and then runs the application's request handler in those loops.
My aiohttp
application has a collection of connected WebSockets
(mobile application clients) that I want to notify on event occurred in any of application processes started by Gunicorn
.
And I want to notify all WebSockets
that are connected to all application processes.
Therefore I create some kind of upstream proxy using ZeroMQ
and I want to subscribe to it using zmq.SUB
socket from each application process.
...So basically I want to achieve something like this in each application worker:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')
while True:
event = socket.recv()
for ws in app['websockets']:
ws.send_bytes(event)
# break before app shutdown. How?
How can I listen the ZeroMQ
proxy within aiohttp
application to forward messages to WebSockets
?
Where can I put this code to run in background within event loop and how to run and shutdown it correctly within aiohttp
application's life cycle?
UPDATE
I've already created an issue in aiohttp's GitHub repository describing the problem and proposing a possible solution. I'd highly appreciate an input here or there on matter of the problem described.
Ok, the question and the discussion on this issue has led to the new functionality I've contributed to aiohttp
, namely in version 1.0 we'll have an ability to register on_startup
application signals using Application.on_startup()
method.
Documentation.
Working example on the master branch.
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app
async def websocket_handler(request):
ws = WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
try:
async for msg in ws:
print(msg)
await asyncio.sleep(1)
finally:
request.app['websockets'].remove(ws)
return ws
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=999, message='Server shutdown')
async def listen_to_redis(app):
try:
sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
ch, *_ = await sub.subscribe('news')
async for msg in ch.iter(encoding='utf-8'):
# Forward message to all connected websockets:
for ws in app['websockets']:
ws.send_str('{}: {}'.format(ch.name, msg))
print("message in {}: {}".format(ch.name, msg))
except asyncio.CancelledError:
pass
finally:
print('Cancel Redis listener: close connection...')
await sub.unsubscribe(ch.name)
await sub.quit()
print('Redis connection closed.')
async def start_background_tasks(app):
app['redis_listener'] = app.loop.create_task(listen_to_redis(app))
async def cleanup_background_tasks(app):
print('cleanup background tasks...')
app['redis_listener'].cancel()
await app['redis_listener']
async def init(loop):
app = Application(loop=loop)
app['websockets'] = []
app.router.add_get('/news', websocket_handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.on_shutdown.append(on_shutdown)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)