Let's say I have a web app driven by Uvicorn server, the app implements GraphQL API with a mutation that starts long calculations on the server-side and a query endpoint that checks the status of the server. Let's say we want to know how many tasks are running in the background. I have a simplified code, which does not work:
import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()
FIFO = []
async def long_task():
print('Starting long task...')
global FIFO
FIFO = [1, *FIFO]
# mock long calc with 5sec sleep
time.sleep(5)
FIFO.pop()
print('Long task finished!')
@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
print('Start to resolve long task...')
asyncio.create_task(long_task())
print('Resolve finished!')
return {}
@query_t.field('ping')
async def resolve_ping(_parent, info):
return f'FIFO has {len(FIFO)} elements'
def main():
schema_str = ariadne.gql('''
type Mutation{
longTask: longTaskResponse
}
type longTaskResponse {
message: String
}
type Query {
ping: String
}
''')
schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
gql_app = ariadne.asgi.GraphQL(schema)
app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
uvicorn.run(app,
host='0.0.0.0',
port=9002,
log_level='error')
if __name__ == '__main__':
main()
After running
$ python main.py
I send a mutation in the GraphQL GUI in the first tab:
mutation longTaskQueue{
longTask {
message
}
}
In the second tab, I try to retrieve the length of the FIFO:
query ping {
ping
}
It seems that it's possible to run 2 long_task
, but ping
is waiting until all long_task
will be finished. My general question is how to run heavy code in the background and do not block GQL API?
After many attempts I made it, now I can put many tasks in the background and track their amount (API is not freezing on one long task). What is happening is the blocking computations are run in a pool:
import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()
FIFO = []
async def long_task():
print('Starting long task...')
global FIFO
FIFO = [1, *FIFO]
# mock long calc with 5sec sleep
time.sleep(5)
FIFO.pop()
print('Long task finished!')
def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
loop.close()
@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=5)
loop.set_default_executor(ProcessPoolExecutor())
print('Start to resolve long task...')
loop.run_in_executor(executor, run, long_task)
print('Resolve finished!')
return {}
@query_t.field('ping')
async def resolve_ping(_parent, info):
return f'FIFO has {len(FIFO)} elements'
def main():
schema_str = ariadne.gql('''
type Mutation{
longTask: longTaskResponse
}
type longTaskResponse {
message: String
}
type Query {
ping: String
}
''')
schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
gql_app = ariadne.asgi.GraphQL(schema)
app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
uvicorn.run(app,
host='0.0.0.0',
port=9002,
log_level='error')
if __name__ == '__main__':
main()
The solution inspired by this answer.