I have the following simple python web server application. When I trigger /sleep call - until the time for sleep ends and the response returns - all other calls on /quick are blocked. I am not sure what it wrong with this code. Can someone provide some clarity?
from aiohttp import web
import asyncio
import time
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request):
name = request.match_info.get('name', "Anonymous")
time.sleep(12) // trivializing here; actual code has a transition from async to sync
text = "Hello, " + name
return web.Response(text=text)
async def init(loop):
app = web.Application(loop=loop)
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('server started')
return srv
def create_server():
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
create_server()
The key idea of my solution is to use loop.run_in_executor
with correct for your case Pool. You can solve the problem the following way:
from aiohttp import web
import asyncio
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
def blocking_code():
"""Some long running code"""
time.sleep(12)
return "!!!!"
async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
"""Wrapper to be used in asyncio Task"""
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
logging.info(f"{datetime.now()}: {r}")
async def handle(request: web.Request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request: web.Request):
"""We wait fore results here, then send response"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you want to wait for result
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
text = "Hello, " + name + r
return web.Response(text=text)
async def fast_sleep_answer(request: web.Request):
"""We send response as fast as possible and do all work in another asyncio Task"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you do not want to want for result
asyncio.create_task(blocking_code_task(loop, request))
text = "Fast answer" + name
return web.Response(text=text)
async def on_shutdown(app):
"""Do not forget to correctly close ThreadPool"""
app["workers_pool"].shutdown()
logging.info(f"{datetime.now()}: Pool is closed")
async def init(args=None):
"""Changed your code for newer aiohttp"""
pool = ThreadPoolExecutor(8)
app = web.Application()
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
app.router.add_get('/fast', fast_sleep_answer)
app["workers_pool"] = pool # can be ThreadPool or ProcessPool
app.on_shutdown.append(on_shutdown) # close the pool when app closes
return app
# the better way to tun app
# name of file is x.py
# in Linux command will be python3
# python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
web.run_app(init(), host="0.0.0.0", port=8080)
All blocking IN/OUT ops are made in ThreadPoolExecutor. If your tasks are CPU bound go with ProcessPoolExecutor. I showed two cases: 1) when you can't answer as fast as possible and need to wait for results 2) when you can just answer and then make all work in background.