Search code examples
celeryairflowcelery-taskairflow-scheduler

Airflow: How to run a task on multiple workers


I have just set up airflow with celery executor and here is a skeleton of my DAG

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t1)

I have 2 workers. One of them runs only one queue called local_queue and the other runs two queues called local_queue,test_queue

I want to run task 1 on just 1 machine, but task 2 and 3 on both machines. i.e on worker 1 running just local_queue, t2 and t3 should run and on worker 2 running both local_queue and test_queue all 3 (t1,t2 and t3) should run. Total number of task runs should be 5.

However, when i run this, only 3 tasks are run. 1) print_date is run for worker 2 (which is correct) 2) print_host is run for worker 1 only (incorrect. Should run for both workers) and 3) print_uptime is run for worker 2 only(also incorrect. Should run for both workers)

Can you please guide me on how to set this up so that 5 tasks are run. In production, i want to manage machines by grouping them into queues and for all machines having QUEUE_A -> do X and all machines having QUEUE_B -> do Y etc.

Thank you


Solution

  • Instead of having one worker work 2 queues, have each worker work one queue. So the worker commands should look like this:

    airflow worker -q test_queue
    airflow worker -q local_queue
    

    Then have two of the same task, but in different queus.

    dag = DAG('dummy_for_testing', default_args=default_args)
    
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date >> /tmp/dag_output.log',
        queue='test_queue',
        dag=dag)
    
    t3 = BashOperator(
        task_id='print_host',
        bash_command='hostname >> /tmp/dag_output.log',
        queue='local_queue',
        dag=dag)
    
    t3_2 = BashOperator(
        task_id='print_host_2',
        bash_command='hostname >> /tmp/dag_output.log',
        queue='test_queue',
        dag=dag)
    
    t2 = BashOperator(
        task_id='print_uptime',
        bash_command='uptime >> /tmp/dag_output.log',
        queue='local_queue',
        dag=dag)
    
    t2_2 = BashOperator(
        task_id='print_uptime_2',
        bash_command='uptime >> /tmp/dag_output.log',
        queue='test_queue',
        dag=dag)
    
    t2.set_upstream(t3)
    t2.set_upstream(t3_2)
    t2.set_upstream(t1)
    
    t2_2.set_upstream(t3)
    t2_2.set_upstream(t3_2)
    t2_2.set_upstream(t1)