Search code examples
pythonpython-asyncioaiohttp

python aiohttp behaving like single threaded application


I have the following simple python web server application. When I trigger /sleep call - until the time for sleep ends and the response returns - all other calls on /quick are blocked. I am not sure what it wrong with this code. Can someone provide some clarity?

from aiohttp import web
import asyncio
import time

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def sleephandle(request):
    name = request.match_info.get('name', "Anonymous")
    time.sleep(12) // trivializing here; actual code has a transition from async to sync 
    text = "Hello, " + name
    return web.Response(text=text)

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_get('/quick', handle)
    app.router.add_get('/sleep', sleephandle)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('server started')
    return srv

def create_server():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

create_server()

Solution

  • The key idea of my solution is to use loop.run_in_executor with correct for your case Pool. You can solve the problem the following way:

    from aiohttp import web
    import asyncio
    import time
    import logging
    from concurrent.futures import ThreadPoolExecutor
    from datetime import datetime
    
    
    def blocking_code():
        """Some long running code"""
        time.sleep(12)
        return "!!!!"
    
    
    async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
        """Wrapper to be used in asyncio Task"""
        r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
        logging.info(f"{datetime.now()}: {r}")
    
    
    async def handle(request: web.Request):
        name = request.match_info.get('name', "Anonymous")
        text = "Hello, " + name
        return web.Response(text=text)
    
    
    async def sleephandle(request: web.Request):
        """We wait fore results here, then send response"""
        name = request.match_info.get('name', "Anonymous")
        loop = asyncio.get_event_loop()
        # if you want to wait for result
        r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
        text = "Hello, " + name + r
        return web.Response(text=text)
    
    
    async def fast_sleep_answer(request: web.Request):
        """We send response as fast as possible and do all work in another asyncio Task"""
        name = request.match_info.get('name', "Anonymous")
        loop = asyncio.get_event_loop()
        # if you do not want to want for result
        asyncio.create_task(blocking_code_task(loop, request))
        text = "Fast answer" + name
        return web.Response(text=text)
    
    
    async def on_shutdown(app):
        """Do not forget to correctly close ThreadPool"""
        app["workers_pool"].shutdown()
        logging.info(f"{datetime.now()}: Pool is closed")
    
    
    async def init(args=None):
        """Changed your code for newer aiohttp"""
        pool = ThreadPoolExecutor(8)
        app = web.Application()
        app.router.add_get('/quick', handle)
        app.router.add_get('/sleep', sleephandle)
        app.router.add_get('/fast', fast_sleep_answer)
        app["workers_pool"] = pool  # can be ThreadPool or ProcessPool
        app.on_shutdown.append(on_shutdown)  # close the pool when app closes
        return app
    
    # the better way to tun app
    # name of file is x.py
    # in Linux command will be python3
    # python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
    if __name__ == '__main__':
        logging.basicConfig(level=logging.INFO)
        web.run_app(init(), host="0.0.0.0", port=8080)
    
    

    All blocking IN/OUT ops are made in ThreadPoolExecutor. If your tasks are CPU bound go with ProcessPoolExecutor. I showed two cases: 1) when you can't answer as fast as possible and need to wait for results 2) when you can just answer and then make all work in background.