Search code examples
celerydjango-celerycelery-taskceleryddjcelery

Celery's task_reject_on_worker_lost doesn't work with Redis as message broker


I'm currently using version 5.2.6 of Celery and version 6.2.6 of Redis. When I turn on the task_reject_on_worker_lost flag, I am expecting Celery to redeliver a task executed by a worker that died abruptly. However, trying this on Redis as message broker my task doesn't actually get redelivered immediately after a worker goes down. On the other hand, when I try the exact same configuration with RabbitMQ it works as expected.

Any pointers on how to achieve the same behavior with Redis as message broker?


Solution

  • I am new to celery recently and facing the same issue as you did. Which means with ack config:

    task_acks_late = True                       # ack after task end
    task_acks_on_failure_or_timeout = True      # ack if task exception
    task_reject_on_worker_lost = True           # no ack if worker killed
    

    If broker config use redis:

    broker_url = f'redis://127.0.0.1:6379/1'
    

    Task will not be re queued if worker being killed during running task and restarted again.

    But if use rabbitmq:

    broker_url = 'amqp://guest:guest@localhost:5672/'
    

    Task got re queued to run.

    My environment

    • linux 5.15.10-arch1-1
    • python 3.8.13
    • celery==5.2.7

    Finally, I found this comment from celery github issues.

    Additional config value visibility_timeout of broker_transport_options is required for redis broker.

    I added the additional config in my config and it's working.

    FYI, here is my config file :

    • celery_config.py
    broker_url = f'redis://127.0.0.1:6379/1'
    result_backend = f'redis://127.0.0.1:6379/2'
    
    # task message ack
    # https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_acks_late
    # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-acks-on-failure-or-timeout
    # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-reject-on-worker-lost
    task_acks_late = True                       # ack after task end
    task_acks_on_failure_or_timeout = True      # ack if task exception
    task_reject_on_worker_lost = True           # no ack if worker killed
    # only for redis broker
    # https://github.com/celery/celery/issues/4984
    broker_transport_options = {'visibility_timeout': 10}
    
    • app.py
    import celery
    
    import celery_config
    
    app = celery.Celery("celery")
    app.config_from_object(celery_config)