Search code examples
pythondjangocelerydjango-celerycelery-task

Send tasks to two separate workers in celery


I have two tasks in celery and would like to send them to different workers to complete but I am not sure how to do this. I have looked at the task_routes section of the celery documentation and have tried a few things from stackoverflow but no luck.

tasks.py

@app.task
def task1():
     does something


@app.task
def task2():
     does something else

I have two celery workers and I would like them to focus on one task each so worker1 on task1 and worker2 on task2

celery.py

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

app = Celery('project')

app.conf.timezone = 'Europe/London'

app.config_from_object('django.conf:settings')
app.conf.update(BROKER_URL=str(os.getenv('REDIS_URL')),
                CELERY_RESULT_BACKEND=str(os.getenv('REDIS_URL')),
                broker_use_ssl = {
                        'ssl_cert_reqs': ssl.CERT_NONE
                    },
                    redis_backend_use_ssl = {
                        'ssl_cert_reqs': ssl.CERT_NONE
                    })
                
app.autodiscover_tasks()

Then procfile -

web: gunicorn project.wsgi --log-file -
worker1: celery -A project worker -l INFO --concurrency=1 -Ofair -n worker1.%h
worker2: celery -A project worker -l INFO --concurrency=1 -Ofair -n worker2.%h

How do I set the queues up so that worker1 = task1 and worker2 = task2?


Solution

  • You can set two separate queues for different tasks:

    # 'your.project.tasks.task2' is same as import path
    app.conf.task_routes = {'your.project.tasks.task2': {'queue': 'some_special_queue'}}
    

    And run Celery as:

    # you may add -Q celery to first command (celery is a default queue name if you didn't specify it)
    celery -A project worker -l INFO --concurrency=1 -Ofair -n worker1.%h 
    celery -A project worker -l INFO --concurrency=1 -Ofair -n worker2.%h -Q some_special_queue
    

    task2 will add messages to some_special_queue and only worker2 listening this queue

    All other tasks will be handled by standard celery queue