Search code examples
pythonflaskbackground-processpython-multithreadingpython-asyncio

Async Method Not Executed in Background in Flask Application


I have a GraphQL API built with Flask and Graphene packages, running on Python 3.5.4. One of the GraphQL mutation is taking some time to execute (2 to 5 minutes) and I don't want the end user to wait for its execution to complete. I would like the mutation method to be executed in the background and instantly return a message to the user .

I have looked into asyncio package but for some reason the execution still appears at the forefront and my script is waiting. Would you have any idea what I'm doing wrong? The script is quite long so I've put a summary below with the key elements related to asyncio.

File mutation_migration_plan.py

from migration_script import Migration, main
import asyncio
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
    [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(migration_plan))
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

File migration_script.py

class Migration():
    """Class to execute migration of Plan, Step, Object."""

    @staticmethod
    async def migrate(migration_plan, migration_step=None, migration_object=None):
        [...]

async def main(migration_plan, migration_step=None, migration_object=None):
    asyncio.ensure_future(Migration.migrate(migration_plan, migration_step, migration_object))

Basically I'm expecting to see the print('Migration plan execution has started. You will receive an e-mail when it is terminated.') almost instantly in my console window while the method loop.run_until_complete(main(migration_plan)) but right now it's not the case and the print appears only at the end of the execution.

[UPDATE]

Following the answer of @Vincent below, I have updated the first file File mutation_migration_plan.py to use ThreadPoolExecutor and removed everything related to asyncio from both files.

File mutation_migration_plan.py

from migration_script import Migration
from concurrent.futures import ThreadPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ThreadPoolExecutor(max_workers=1)
        future = executor.submit(Migration.migrate, migration_plan)
        # print(future.result())
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

My script runs fine when I add the print(future.result()) line but it does not execute in background (makes sense since I'm trying to print the results). However, when I comment out the print, my method Migration.migrate does not seem to execute properly (I know it because I'm not seeing the results in my database). Any idea why?

[UPDATE BIS]

File mutation_migration_plan.py

I have been able to execute my method asynchronously using ProcessPoolExecutor and removing all references to asyncio on both files. See the following code:

File mutation_migration_plan.py

from concurrent.futures import ProcessPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ProcessPoolExecutor()
        executor.submit(Migration.migrate, migration_plan.id)
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

It's working but although the process executes in the backend, my Falsk app takes a very longtime to send the http response and the response is sometimes empty.


Solution

  • It's a common misconception, but you can't plug asyncio into an existing application and expect it to work. In asyncio, every blocking call must use the await syntax in the context of a coroutine. That's the only way it can achieve single-threaded concurrency. This means you would have to use aiohttp instead of flask, along with a library like aiohttp-graphql.

    This would require a major rewrite of your application. If you don't want to go through that, there are other solutions that integrate well with flask. You could use celery as pointed by @dirn, or one of the executors provided by concurrent.futures.