Search code examples
pythonrabbitmqcelerymessage-queuemessaging

Run multiple Celery tasks using a topic exchange


I'm replacing some homegrown code with Celery, but having a hard time replicating the current behaviour. My desired behaviour is as follows:

  • When creating a new user, a message should be published to the tasks exchange with the user.created routing key.
  • Two Celery tasks should be trigged by this message, namely send_user_activate_email and check_spam.

I tried implementing this by defining a user_created task with a ignore_result=True argument, plus a task for send_user_activate_email and check_spam.

In my configuration, I added the following routes and queues definitions. While the message is delivered to the user_created queue, it is not delivered to the other two queues.

Ideally, the message is only delivery to the send_user_activate_email and check_spam queues. When using vanilla RabbitMQ, messages are published to an exchange, to which queues can bind, but Celery seems to deliver a message to a queue directly.

How would I implement the behaviour outlined above in Celery?

CELERY_QUEUES = {
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'send_user_activate_email': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'check_spam': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
}

Solution

  • It sounds like you are expecting a single message to trigger/be consumed by two queues but this is not how Celery works. An Exchange will post a task to eligible queues, but once it is consumed, the other Queues ignore the message. You need a message per Task you want to trigger.

    There is often confusion with new Celery users because there are two uses of "Queue" in this system; Kombu Queues which the Queue() and documentation refer to, and the AMQP Queues, which hold messages directly and are consumed by workers. When we publish to queues, we think of the AMQP ones, which is incorrect. (thanks to answer linked below).

    Back to your issue, if I am understanding correctly, when user_created is consumed, you want it to spawn two more tasks; send_user_activate_email and check_spam. Furthermore, these should not be dependent on each other; they can run in parallel on separate machines and do not need to know the status of one another.

    In this case, you want user_created to "apply_async" these two new Tasks and return. This could be done directly, or you can use a Celery "Group" containing check_spam and send_user_activate_email to achieve this. The group gives some nice shorthand and lends some structure to your tasks, so personally I'd nudge you that direction.

    #pseudocode
    group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
    

    This setup would create four messages; one for each Task you want to execute plus one for the Group(), which itself will have a result.

    In your case, I am not sure the Exchange or ignore_result is necessary, but I'd need to see the Task code and understand the system more to make that judgement.

    http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys Why do CELERY_ROUTES have both a "queue" and a "routing_key"?

    (if I am way off I'll delete/remove the answer...)