Search code examples
pythoncelery

How to configure Celery to work as a reliable send queue


I’m trying to use Celery in order to queue up HTTP-requests for sending to another service. The most important thing is that no data is lost, so every request is sent at least once. Additionally I want to know why tasks have failed even if they are retried.

I’ve had multiple failures with this so my current solution is to use a broad autoretry_for with retry_backoff and acks_late:

class SendTask(celery.Task):
    """Celery task to send data."""

    name = "app_name.send_data"
    autoretry_for = (Exception,)
    retry_backoff = 2  # This is the initial retry delay in seconds.
    retry_backoff_max = 24 * 3600  # This is the maximum amount of time between retries.
    max_retries = None  # Never stop trying.

    # Risk sending one event more than once rather than losing it when Celery fails.
    acks_late = True
    reject_on_worker_lost = True

    def __init__(self, client):
        self.client = client  # Wrapper around the requests library

    def run(self, data, *args, **kwargs):  # pylint: disable=arguments-differ
        """Send data using a post request."""
        data.setdefault("date", datetime.utcnow().isoformat())
        self.client.post("endpoint", json=data)  # throws an exception on any failure or non 2xx HTTP response.

Global configuration is just the broker:

CELERY = {
    "broker_url": "redis://localhost:6379/0",
}
  1. Tasks keep being retried even if the retry succeeds, leading to a huge amount of duplicates. → No log messages. Also no reliable way of monitoring the queue.
  2. Restarting the worker leads to lost tasks: Although I get the message in the logs that says Restoring xxx unacknowledged message(s) the worker never takes up retrying after the restart. Data is lost.

Is this something that Celery is able and suitable to handle? If so is anything obviously wrong with my configuration / expectations of how it should work?


Solution

  • Short answer: Use RabbitMQ. While Celery claims it supports Redis as a broker this does not seem to be actually true.

    Answering my own question: It seems that I have encountered the same issue as in this question: Celery restart loss scheduled tasks … and came to the same conclusion.

    The related bug for celery is https://github.com/celery/kombu/issues/533 which was posted in 2015. I have added steps to reproduce to the report back in May 2023 and did not get a reaction yet.