Search code examples
rabbitmqceleryamqpcelery-taskrabbitmqctl

send_task works only with a specific user


setup: Celery 4.1, RabbitMQ 3.6.1 (As broker), Redis (As backend, not relevant here).

Having two rabbit users:

  • admin_user with permissions of .* .* .*.
  • remote_user with permissions of ack ack ack.

admin_user can trigger tasks and is used by celery workers to handle tasks.

remote_user can only trigger one type of task - ack and is enqueued in a dedicated ack queue which later on being consumed by ack worker (by admin_user).

The remote_user sends the task by the following code:

from celery import Celery

app = Celery('remote', broker='amqp://remote_user:remote_pass@<machine_ip>:5672/vhost')
app.send_task('ack', args=('a1', 'a2'), queue='ack', route_name='ack')

This works perfectly in Celery 3.1. After upgrade to Celery 4.1 it doesn't send the task anymore. The call returns an AsyncResult but I don't see the message in Celery flower (or via rabbit management ui), or in the logs.

  • Trying to set permissions to remote_user .* .* .* as in the admin_user - doesn't help.
  • Trying to add administrator tag - doesn't help.

Changing the user of the broker to 'amqp://admin_user:admin_pass@<machine_ip>:5672/vhost' DOES work :

from celery import Celery

app = Celery('remote', broker='amqp://admin_user:admin_pass@<machine_ip>:5672/vhost')
app.send_task('ack', args=('a1', 'a2'), queue='ack', route_name='ack')

But I don't want to give a remote machine the admin_user permissions. Any idea what I can do?


Solution

  • Solved, API changed I guess, but to stay with the current permissions of RabbitMQ I had to use the following route:

    old_celery_config.py: (celery 3.1)

    CELERY_ROUTES = {
        'ack_task': {
            'queue': 'geo_ack'
        }
    }
    

    celery_config.py: (celery 4.1)

    CELERY_ROUTES = {
        'ack_task': {
            'exchange': 'ack',
            'exchange_type': 'direct',
            'routing_key': 'ack'
        }
    }
    

    run_task.py:

    from celery import Celery
    
    app = Celery('remote', broker='amqp://remote_user:remote_pass@<machine_ip>:5672/vhost')
    app.config_from_object('celery_config')
    app.send_task('ack_task', args=('a1', 'a2'))