Search code examples
pythonpython-3.xgraphqlpython-asynciouvicorn

How to use Uvicorn with asyncio.create_task() to put task in background?


Let's say I have a web app driven by Uvicorn server, the app implements GraphQL API with a mutation that starts long calculations on the server-side and a query endpoint that checks the status of the server. Let's say we want to know how many tasks are running in the background. I have a simplified code, which does not work:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    print('Start to resolve long task...')
    asyncio.create_task(long_task())
    print('Resolve finished!')
    return {}

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has {len(FIFO)} elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation{
        longTask: longTaskResponse
    }
    type longTaskResponse {
        message: String
    }
    type Query {
        ping: String
    }
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

After running

$ python main.py

I send a mutation in the GraphQL GUI in the first tab:

mutation longTaskQueue{
  longTask {
    message
  }
}

In the second tab, I try to retrieve the length of the FIFO:

query ping {
  ping
}

It seems that it's possible to run 2 long_task, but ping is waiting until all long_task will be finished. My general question is how to run heavy code in the background and do not block GQL API?


Solution

  • After many attempts I made it, now I can put many tasks in the background and track their amount (API is not freezing on one long task). What is happening is the blocking computations are run in a pool:

    import asyncio
    import logging
    import time
    import ariadne
    import ariadne.asgi
    import uvicorn
    import starlette as sl
    import starlette.applications
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    
    query_t = ariadne.QueryType()
    mutation_t = ariadne.MutationType()
    
    FIFO = []
    
    async def long_task():
        print('Starting long task...')
        global FIFO
        FIFO = [1, *FIFO]
        # mock long calc with 5sec sleep
        time.sleep(5)
        FIFO.pop()
        print('Long task finished!')
    
    
    def run(corofn, *args):
        loop = asyncio.new_event_loop()
        try:
            coro = corofn(*args)
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coro)
        finally:
            loop.close()
    
    
    @mutation_t.field('longTask')
    async def resolve_long_task(_parent, info):
        loop = asyncio.get_event_loop()
        executor = ThreadPoolExecutor(max_workers=5)
        loop.set_default_executor(ProcessPoolExecutor())
        print('Start to resolve long task...')
        loop.run_in_executor(executor, run, long_task)
        print('Resolve finished!')
        return {}
    
    @query_t.field('ping')
    async def resolve_ping(_parent, info):
        return f'FIFO has {len(FIFO)} elements'
    
    
    def main():
        schema_str = ariadne.gql('''
        type Mutation{
            longTask: longTaskResponse
        }
        type longTaskResponse {
            message: String
        }
        type Query {
            ping: String
        }
        ''')
        schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
        gql_app = ariadne.asgi.GraphQL(schema)
        app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
        uvicorn.run(app,
                    host='0.0.0.0',
                    port=9002,
                    log_level='error')
    
    
    if __name__ == '__main__':
        main()
    

    The solution inspired by this answer.