Search code examples
pythoncelerycelery-task

how to execute celery tasks conditionally python


I am new to celery. I have a celery task that needs to be executed when a condition is met. Otherwise retry after few minutes. From the below code, I am stuck at how to retry the same task in else condition? Appreciate your help.

@app.task(bind=True,soft_time_limit=4 * 3600)
def task_message_queue(id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
   ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
   num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
   if num_files < 20:
     #Move files from src to destination
   else:
     #wait for 2 minutes and retry the task

Solution

  • You have to call retry to make celery retry the task and you can set the countdown so celery will wait for that much time and retry the task. Below is the code borrowed from official celery docs. Modify the @task decorator according to your needs and also self.retry

    from celery.task import task
    @app.task(bind=True,soft_time_limit=4 * 3600)
    def task_message_queue(self, id, name=None, tid=None, src=None, dest=None, queue="MessageQueue"):
       ThreadLocalStore().set_data({"id": id, "tid": tid, "name": name,"src": src, "dest":dest})
       num_files = os.popen("find %s -type f | wc -l" % dest).read().strip().split('\n')[0]
       try:
           if num_files < 20:
               #Move files from src to destination
           else:
               raise SOME_EXCEPTION
               #wait for 2 minutes and retry the task
       except SOME_EXCEPTION as exc:
           self.retry(exc=exc, countdown=TIME_TO_WAIT_BEFORE_RETRY)