Search code examples
djangopython-3.xcelerycelerybeat

Celery what happen to running tasks when using app.control.purge()?


Currently i have a celery batch running with django like so:

Celery.py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
   app.control.purge()
   sender.add_periodic_task(30.0, check_loop.s())
   recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
   print("setup_periodic_tasks")

@app.task()
def check_loop():
    .....
    start = database start number
    end = database end number
    callling apis in a list from id=start to id=end
    create objects
    update database(start number = end, end number = end + 3)

    ....


@app.task()
def recursion_function(default_retry_delay=10):
   .....
   do some looping
   ....
   #when finished, call itself again
   recursion_function.apply_async(countdown=30)

My aim is whenever the celery file get edited then it would restart all the task -remove queued task that not yet execute(i'm doing this because recursion_function will run itself again if it finished it's job of checking each record of a table in my database so i'm not worry about it stopping mid way).

The check_loop function will call to an api that has paging to return a list of objects and i will compare it to by record in a table , if match then create a new custom record of another model

My question is when i purge all messages, will the current running task get stop midway or it gonna keep running ? because if the check_loop function stop midway looping through the api list then it will run the loop again and i will create new duplicate record which i don't want

EXAMPLE:

during ruuning task of check_loop() it created object midway (on api list from element id=2 to id=5), server restart -> run again, now check_loop() run from beginning(on api list from element id=2 to id=5) and created object from that list again(which 100% i don't want)

Is this how it run ? i just need a confirmation

EDIT:

https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks

I added app.control.purge() because when i restart then recursion_function get called again in setup_periodic_tasks while previous recursion_function from recursion_function.apply_async(countdown=30) execute too so it multiplied itself


Solution

  • Yes, worker will continue execution of currently running task unless worker is also restarted.

    Also, The Celery Way is to always expect tasks to be run in concurrent environment with following considerations:

    • there are many tasks running concurrently
    • there are many celery workers executing tasks
    • same task may run again
    • multiple instances of the same task may run at the same moment
    • any task may be terminated any time

    even if you are sure that in your environment there is only one worker started / stopped manually and these do not apply - tasks should be created in such way to allow everything of this to happen.

    Some useful techniques:

    • use database transactions
    • use locking
    • split long-running tasks into faster ones
    • if task has intermediate values to be saved or they are important (i.e. non-reproducible like some api calls) and their processing in next step takes time - consider splitting into several chained tasks

    If you need to run only one instance of a task at a time - use some sort of locking - create / update lock-record in the database or in the cache so others (same tasks) can check and know this task is running and just return or wait for previous one to complete.

    I.e. recursion_function can also be Periodic Task. Being periodic task will make sure it is run every interval, even if previous one fails for any reason (and thus fails to queue itself again as in regular non-periodic task). With locking you can make sure only one is running at a time.


    check_loop():

    First, it is recommended to save results in one transaction in the database to make sure all or nothing is saved / modified in the database.

    You can also save some marker that indicates how many / status of saved objects, so future tasks can just check this marker, not each object.

    Or somehow perform check for each element before creating it that it already exists in the database.