Search code examples
celerydjango-celery

Celery task STARTED permanantly (not retried)


We use a celery worker in a docker instance. If the docker instance is killed (docker could be changed and brought back up) we need to retry the task. My task currently looks like this:

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
    build_chain = chain(
        build_dataset_docstore.s(config, import_data),
        build_index.s(),
        assemble_bundle.s()
    ).on_error(handle_chain_error.s())

    return build_chain

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
    # do lots of stuff

To imitate the container being restarted (worker being killed) I'm running the following script:

SLEEP_FOR=1

echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker

echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR

echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker

Looking in flower i see the task is STARTED... cool, but...

  • why isnt it retried?
  • Do i need to handle this circumstance manually?
  • if so, what is the correct way to do this?

EDIT: from the docs:

If the worker won’t shutdown after considerate time, for being stuck in an infinite-loop or similar, you can use the KILL signal to force terminate the worker: but be aware that currently executing tasks will be lost (i.e., unless the tasks have the acks_late option set).

I'm using the acks late option, so why isn't this retrying?


Solution

  • The issue here seems to be task_acks_late (https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late), which i assume is a param for the celery app, on the task.

    I updated task_acks_late to acks_late and added reject_on_worker_lost and this functions as expected.

    Thus:

    @app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
    def assemble_bundle(self, config, import_data):
        # do lots of stuff