I want to load a different configuration for Celery workers depending on which queueu I'm initializing. Specially, I want to change its concurrency.
I have seen that concurrency can be changed if I load it in the config. For example, if I do:
celery_app = current_celery_app
# myconfig is a py file with all configuration, including concurrency
celery_app.config_from_object(myconfig, namespace='CELERY')
Then I saw that I can ask for signals, like:
@celeryd_init.connect
def configure_workers(sender=None, **kwargs):
if 'celery' in celery_app.amqp.queues.keys():
celery_app.config_from_object(config, namespace='CELERY')
celery_app.conf.update(worker_concurrency=4)
elif 'queue2' in celery_app.amqp.queues.keys():
celery_app.config_from_object(config, namespace='queue2')
celery_app.conf.update(worker_concurrency=2)
Here I saw that excluding the worker_concurrency from config and changing it with celery_app.conf.update(worker_concurrency=4)
also works. However, this solution could fulfill my necessity if I could read the queues I'm starting.
To init the celery app I do:
celery -A run_api.celery worker -Q queue2
But if I print the queues in my first or second code example I'm always getting only one queue when executing celery_app.amqp.queues.keys()
: celery (which is the default one). If I try any other signal, such as celeryd_after_setup, worker_ready, worker_start I see the queue queue2 if I execute celery_app.amqp.queues.keys()
. However, if I try to do
celery_app.conf.update(worker_concurrency=2)
there's no change in concurrency. I'm out of ideas. How can I read celery queues passed in -Q argument in celeryd_init signal? or how can I change the concurrency after the worker is created?
This project is for FastAPI interconnection. I'm not using Django.
I found a workaround. It's not what I would like to, but I think it's more general than initializing celery with --concurrency, which was my last option in case I couldn't find a better one. My workaround:
I found that you can start celery with -n option. This changes the name of the celery:
celery -A run_api.celery worker -Q queue2 -n queue2_name
Then in the signal:
@celeryd_init.connect
def configure_workers(sender=None, **kwargs):
worker_name = sender.split("@")[-1]
# sender is celery@what_you_put_after_-n
# in this case queue2_name
if 'queue2' in worker_name:
celery_app.config_from_object(config, namespace='queue2')
celery_app.conf.update(worker_concurrency=2)
else:
celery_app.config_from_object(config, namespace='CELERY')
celery_app.conf.update(worker_concurrency=4)
I'm working with celery 5.2.6