Search code examples
celerydjango-celery

Is it possible to use custom routes for celery's canvas primitives?


I have distinct Rabbit queues each dedicated to a special kind of order processing:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    pass  # elided ...


@celery.task
def process_order_for_product_y(order_id):
    pass  # elided ...


# settings.py

CELERY_QUEUES = {
    "black_hole": {
        "binding_key": "black_hole",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_x": {
        "binding_key": "product_x",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_y": {
        "binding_key": "product_y",
        "queue_arguments": {"x-ha-policy": "all"}
    },

We have a policy of enforcing explicit routing by setting CELERY_DEFAULT_QUEUE = 'black_hole' and then never consuming from black_hole.

Each of these tasks may use celery's canvas primitives, like so:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    # These can run in parallel
    stage_1_group = group(do_something.si(order_id),
                          do_something_else.si(order_id))

    # These can run in parallel
    another_group = group(do_something_at_end.si(order_id),
                          do_something_else_at_end.si(order_id))

    # These run in a linear sequence
    process_task = chain(
        stage_1_group,
        do_something_dependent_on_stage_1.si(order_id),
        another_group)

    process_task.apply_async()

Supposing I want specific uses of celery.group, celery.chord, celery.chord_unlock, and other canvas tasks to flow through the queue for its corresponding product, rather than getting trapped in a black_hole, is there a way to invoke each particular canvas task with either a custom task name or custom routing_key?

For reasons I won't go into I would prefer to not send all celery.* tasks to a catch-all celery_canvas queue, which is what I am doing in the meantime.


Solution

  • This method allows you to route Celery canvas tasks to the queue of a callback task.

    It is possible to specify a custom class-based task router for Celery as described here.

    Let's focus on the celery.chord_unlock task. Its signature is defined here.

    def unlock_chord(self, group_id, callback, ...):
    

    The second positional argument is the signature of the chord callback task.

    Task signatures in Celery are basically dicts, so that gives us an opportunity to access task options, including the task queue name.

    Here is an example:

    class CeleryRouter(object):
        def route_for_task(self, task, args=None, kwargs=None):
            if task == 'celery.chord_unlock':
                callback_signature = args[1]
                options = callback_signature.get('options')
                if options:
                    queue = options.get('queue')
                    if queue:
                        return {'queue': queue}
    

    Add it to the Celery config:

    CELERY_ROUTES = (CeleryRouter(),