Search code examples
djangocelerycelerybeat

Celery will continue to execute historical tasks after the worker terminates abnormally and restarts


I added the celery crontab task. When an exception occurs in the worker, I hope that the worker will no longer execute unfinished historical tasks. I used the following task base class and wanted to preprocess the task by judging the status of the worker, but it didn't achieve the result I expected. When I stop the worker, after the time exceeds the task execution time, I restart the worker, it still executes the expired task.

class MyTask(Task):

    def before_start(self, task_id, args, kwargs):
        inspect = app.control.inspect()
        active_workers = inspect.active()
        if not active_workers:
            print('worker not runing')
            return False
        print('worker runing')
        return super().before_start(task_id, args, kwargs)


@app.task(base=MyTask)
def test():
    return 'test'

I hope there is any method or parameter setting that can make the worker no longer execute expired tasks


Solution

  • When I created the PeriodicTask task, I set expire_seconds=5. When the scheduled task reaches the execution time point, if it is not executed for more than 5 seconds, the task will be marked as revoked, which can achieve the desired result.