Search code examples
pythonmultithreadingcelerymessage-queuedjango-celery

How to switch tasks between queues in Celery


I've got a couple of tasks in my tasks.py in Celery.

# this should go to the 'math' queue 
@app.task
def add(x,y):
    uuid = uuid.uuid4()
    result = x + y
    return {'id': uuid, 'result': result}

# this should go to the 'info' queue
@app.task
def notification(calculation):
    print repr(calculation)

What I'd like to do is place each of these tasks in a separate Celery queue and then assign a number of workers on each queue.

The problem is that I don't know of a way to place a task from one queue to another from within my code.

So for instance when an add task finishes execution I need a way to place the resulting python dictionary to the info queue for futher processing. How should I do that?

Thanks in advance.

EDIT -CLARIFICATION-

As I said in the comments the question essentially becomes how can a worker place data retrieved from queue A to queue B.


Solution

  • You can try like this.

    Wherever you calling the task,you can assign task to which queue.

    add.apply_async(queue="queuename1")
    
    notification.apply_async(queue="queuename2")
    

    By this way you can put tasks in seperate queue.

    Worker for seperate queues

    celery -A proj -Q queuename1 -l info
    
    celery -A proj -Q queuename2 -l info
    

    But you must know that default queue is celery.So if any tasks without specifying queue name will goto celery queue.So A consumer for celery is needed if any like.

    celery -A proj -Q queuename1,celery -l info
    

    For your expected answer

    If you want to pass result of one task to another.Then

    result = add.apply_async(queue="queuename1")
    result = result.get() #This contain the return value of task
    

    Then

    notification.apply_async(args=[result], queue="queuename2")