Search code examples
pythonqueuetimeoutcelery

Python Celery - Raise if queue is not available


I have defined a route in my Celery configs: task_routes = {'tasks.add': {'queue': 'calculate'}}

So that only a specific worker will run that task. I start my worker: celery -A myproj worker -n worker1@%h -Q calculate

And then run my task: add.apply_async((2, 2), time_limit=5)

Everything goes well. But now, let's say my worker dies and I try to run my task again. It hangs, forever. Time_limit doesn't do me any good since the task will never get to its queue. How can I define a time out in this case? In other words, if no queue is available in the next X seconds, I'd like to raise an error. Is that possible?


Solution

  • I'm assuming you are using Rabbitmq as the message broker, and if you are, there are a few subtleties about how Rabbitmq (and other AMQP-like message queues) work. First of all, when you send a message, your process sends it to an exchange, which in turn routes the message to 0 or more queues. Your queue may or may not have a consumer (i.e. a celery worker) consuming messages, but as a sender you have no control of the receiving side unless there is an active reply from that worker.

    However, I think it is possible to achieve what you want by doing the following (assuming you have a backend)

    1. Make sure your queue is declared with a Message TTL of your choice (let's say 60 seconds). Also make sure it is not declared to delete if no consumers are attached. Also declare a dead-letter exchange.
    2. Have a celery worker listening to your dead letter exchange, but that worker is raising an appropriate exception whenever it receives a message. The easiest here is probably to listen to the messages, but not have any tasks loaded. This way, it will result in a FAILURE in your backend saying something about a not implemented task.

    If your original worker dies, any message in the queue will expire after your selected TTL and be sent to your dead-letter exchange at which point the second worker (the auto-failing one) will receive the message and raise fail the task. Note that you need to set your TTL well above the time you expect the message to linger in the Rabbitmq queue, as it will expire regardless of there being a worker consuming from the queue or not.

    To set up the first queue, I think you need a configuration looking something like:

    Queue(
    default_queue_name,
    default_exchange,
    routing_key=default_routing_key,
    queue_arguments={
        'x-message-ttl': 60000 # milliseconds
        'x-dead-letter-exchange': deadletter_exchange_name,
        'x-dead-letter-routing-key': deadletter_routing_key
    })
    

    The dead letter queue would look more like a standard celery worker queue configuration, but you may want to have a separate config for it, since you don't want to load any tasks for this worker.

    So to sum up, yes it is possible but it is not as straightforward as one might think.