Currently i have a celery batch running with django like so:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
My aim is whenever the celery file get edited then it would restart all the task -remove queued task that not yet execute(i'm doing this because recursion_function
will run itself again if it finished it's job of checking each record of a table in my database so i'm not worry about it stopping mid way).
The check_loop
function will call to an api that has paging to return a list of objects and i will compare it to by record in a table , if match then create a new custom record of another model
My question is when i purge all messages, will the current running task get stop midway or it gonna keep running ? because if the check_loop
function stop midway looping through the api list then it will run the loop again and i will create new duplicate record which i don't want
EXAMPLE:
during ruuning task of check_loop()
it created object midway (on api list from element id=2 to id=5), server restart -> run again, now check_loop()
run from beginning(on api list from element id=2 to id=5) and created object from that list again(which 100% i don't want)
Is this how it run ? i just need a confirmation
EDIT:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
I added app.control.purge()
because when i restart then recursion_function
get called again in setup_periodic_tasks
while previous recursion_function
from recursion_function.apply_async(countdown=30)
execute too so it multiplied itself
Yes, worker will continue execution of currently running task unless worker is also restarted.
Also, The Celery Way is to always expect tasks to be run in concurrent environment with following considerations:
even if you are sure that in your environment there is only one worker started / stopped manually and these do not apply - tasks should be created in such way to allow everything of this to happen.
Some useful techniques:
If you need to run only one instance of a task at a time - use some sort of locking - create / update lock-record in the database or in the cache so others (same tasks) can check and know this task is running and just return or wait for previous one to complete.
I.e. recursion_function
can also be Periodic Task
. Being periodic task will make sure it is run every interval, even if previous one fails for any reason (and thus fails to queue itself again as in regular non-periodic task). With locking you can make sure only one is running at a time.
check_loop()
:
First, it is recommended to save results in one transaction in the database to make sure all or nothing is saved / modified in the database.
You can also save some marker that indicates how many / status of saved objects, so future tasks can just check this marker, not each object.
Or somehow perform check for each element before creating it that it already exists in the database.