Search code examples
pythonflaskpython-asyncio

How to launch / terminate periodic task running on the background using python asyncio in Flask Backend App?


I have a python flask backend app. I want to create a mechanism that can let some jobs run periodic on the background, like auto-update something every 1 minute.

But I have been blocked by some errors that made the backend didn't behave as expected.

view_func.py

from flask import Blueprint, request

from async_periodic_job import AsyncPeriodictUtils

infras_bp = Blueprint("infras", __name__)

@infras_bp.route("/infras/autosync", methods=["PUT"])
def auto_sync():
    args = request.args
    check_required_params(args, ["autosnyc"])
    autosnyc = args.get("autosnyc", default="false").lower() == "true"
    if autosnyc:
        AsyncPeriodictUtils.start_task()
    else:
        AsyncPeriodictUtils.stop_task()
    return "success", 200

async_periodic_job.py

import asyncio
from typing import Callable

from utils import logger

SECOND = 1
MINUTE = 60 * SECOND

logger.debug(f"Import {__file__}")

periodic_jobs = dict()
task_instance = None

# Get default event loop in main thread
loop = asyncio.get_event_loop()


class AsyncPeriodictUtils:

    @staticmethod
    async def run_jobs() -> None:
        while True:
            await asyncio.sleep(10)
            logger.info(f"Called run_jobs periodicly.")
            logger.info(f"periodic_jobs: {periodic_jobs.keys()}")
            for func_name, function in periodic_jobs.items():
                function()
                logger.info(f"Called function '{func_name}' periodicly.")

    @classmethod
    def create_task(cls) -> None:
        global task_instance, loop
        task_instance = loop.create_task(cls.run_jobs())

    @staticmethod
    async def cancel_task() -> None:
        global task_instance
        if task_instance:
            task_instance.cancel()
            try:
                await task_instance
            except asyncio.CancelledError:
                logger.info("Async periodic task has been cancelled.")
            task_instance = None
        else:
            logger.warning("Async periodic task has not been started yet.")

    @classmethod
    def start_task(cls) -> None:
        cls.create_task()
        global loop
        try:
            loop.run_until_complete(task_instance)
        except asyncio.CancelledError:
            pass
        logger.info("Async Periodic jobs launched.")

    @classmethod
    def stop_task(cls) -> None:
        global loop
        try:
            loop.run_until_complete(cls.cancel_task())
        except asyncio.CancelledError:
            pass
        logger.info("Async Periodic jobs terminated.")

    @classmethod
    def add_job(cls, function: Callable) -> None:
        if function.__name__ in periodic_jobs:
            return
        periodic_jobs.update({function.__name__: function})
        logger.info(f"Added function {function.__name__} in periodic jobs.")
        global task_instance
        if not task_instance:
            logger.info(f"Auto enable periodic jobs.")
            cls.start_task()

    @classmethod
    def remove_job(cls, function: Callable) -> None:
        if function.__name__ not in periodic_jobs:
            logger.warning(
                f"function {function.__name__} not in periodic jobs.")
            return
        periodic_jobs.pop(function.__name__)
        logger.info(f"Removed function {function.__name__} in periodic jobs.")
        if not periodic_jobs:
            logger.info(f"Periodic jobs list clear, auto terminate.")
            cls.stop_task()

When I called AsyncPeriodictUtils.start_task() My backend will raise an error and response 500, log shown below:

[2023-10-13-06:06:19][DAAT][ERROR][__init__.py] [Error] stack:
[2023-10-13-06:06:19][DAAT][ERROR][__init__.py] Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1820, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1796, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/app/routes/infras.py", line 134, in auto_sync
    AsyncPeriodictUtils.start_task()
  File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/infras/async_periodic_job.py", line 53, in start_task
    loop.run_until_complete(task_instance)
  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
    self._check_running()
  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

[2023-10-13-06:06:19][werkzeug][INFO][_internal.py] 172.23.0.3 - - [13/Oct/2023 06:06:19] "PUT /infras/autosync?autosnyc=true HTTP/1.1" 500

But after above step happened, the periodic jobs seem to be launched successfully, logs shown below:

[2023-10-13-06:06:23][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:23][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])

When I called AsyncPeriodictUtils.stop_task() My backend will raise an error and response 500, log shown below:

[2023-10-13-06:06:35][DAAT][ERROR][__init__.py] [Error] stack:
[2023-10-13-06:06:35][DAAT][ERROR][__init__.py] Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1820, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1796, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/app/routes/infras.py", line 136, in auto_sync
    AsyncPeriodictUtils.stop_task()
  File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/infras/async_periodic_job.py", line 62, in stop_task
    loop.run_until_complete(cls.cancel_task())
  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
    self._check_running()
  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

/opt/conda/lib/python3.10/site-packages/flask/app.py:1822: RuntimeWarning: coroutine 'AsyncPeriodictUtils.cancel_task' was never awaited
  rv = self.handle_user_exception(e)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
[2023-10-13-06:06:35][werkzeug][INFO][_internal.py] 172.23.0.3 - - [13/Oct/2023 06:06:35] "PUT /infras/autosync?autosnyc=false HTTP/1.1" 500 -

But this didn't terminate the periodic task on the background as expected, the periodic task is still running periodically:

[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:13][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:13][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])

I've struggled trying different kinds of possibility but still coundn't figure it out.

Could anyone please check my code and point out where should I change or provide any guide / direction ? Thanks

I expect:

AsyncPeriodictUtils.start_task(): Launch periodic task without raising exception that make the backend response 500.

AsyncPeriodictUtils.stop_task(): Terminate periodic task without raising exception that make the backend response 500.


Solution

  • You might need to create another loop to perform this extra task or add the task to the current loop. Refer to this answer for more details.

    However, IMO these periodic tasks should be executed in cronjob (server side) or setinterval (JS). The core idea is to leave the server as it is and call the HTTP API from the server or client, as per your needs. This should not mix your current backend framework with unrelated works other than handling HTTP requests.