Search code examples
pythoncelery

How can I know the queues created in celery with -Q argument?


I want to load a different configuration for Celery workers depending on which queueu I'm initializing. Specially, I want to change its concurrency.

I have seen that concurrency can be changed if I load it in the config. For example, if I do:

celery_app = current_celery_app
# myconfig is a py file with all configuration, including concurrency
celery_app.config_from_object(myconfig, namespace='CELERY')

Then I saw that I can ask for signals, like:

@celeryd_init.connect
def configure_workers(sender=None, **kwargs):

    if 'celery' in celery_app.amqp.queues.keys():
        celery_app.config_from_object(config, namespace='CELERY')
        celery_app.conf.update(worker_concurrency=4)

    elif 'queue2' in celery_app.amqp.queues.keys():
        celery_app.config_from_object(config, namespace='queue2')
        celery_app.conf.update(worker_concurrency=2)

Here I saw that excluding the worker_concurrency from config and changing it with celery_app.conf.update(worker_concurrency=4) also works. However, this solution could fulfill my necessity if I could read the queues I'm starting.

To init the celery app I do:

celery -A run_api.celery worker -Q queue2

But if I print the queues in my first or second code example I'm always getting only one queue when executing celery_app.amqp.queues.keys(): celery (which is the default one). If I try any other signal, such as celeryd_after_setup, worker_ready, worker_start I see the queue queue2 if I execute celery_app.amqp.queues.keys(). However, if I try to do

celery_app.conf.update(worker_concurrency=2)

there's no change in concurrency. I'm out of ideas. How can I read celery queues passed in -Q argument in celeryd_init signal? or how can I change the concurrency after the worker is created?

This project is for FastAPI interconnection. I'm not using Django.


Solution

  • I found a workaround. It's not what I would like to, but I think it's more general than initializing celery with --concurrency, which was my last option in case I couldn't find a better one. My workaround:

    I found that you can start celery with -n option. This changes the name of the celery:

    celery -A run_api.celery worker -Q queue2 -n queue2_name
    

    Then in the signal:

    @celeryd_init.connect
    def configure_workers(sender=None, **kwargs):
    
        worker_name = sender.split("@")[-1]
        # sender is celery@what_you_put_after_-n
        # in this case queue2_name
    
        if 'queue2' in worker_name:
            celery_app.config_from_object(config, namespace='queue2')
            celery_app.conf.update(worker_concurrency=2)
        else:
            celery_app.config_from_object(config, namespace='CELERY')
            celery_app.conf.update(worker_concurrency=4)    
    

    I'm working with celery 5.2.6