I have just set up airflow with celery executor and here is a skeleton of my DAG
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
I have 2 workers. One of them runs only one queue called local_queue
and the other runs two queues called local_queue,test_queue
I want to run task 1 on just 1 machine, but task 2 and 3 on both machines. i.e on worker 1 running just local_queue, t2 and t3 should run and on worker 2 running both local_queue and test_queue all 3 (t1,t2 and t3) should run. Total number of task runs should be 5.
However, when i run this, only 3 tasks are run. 1) print_date is run for worker 2 (which is correct) 2) print_host is run for worker 1 only (incorrect. Should run for both workers) and 3) print_uptime is run for worker 2 only(also incorrect. Should run for both workers)
Can you please guide me on how to set this up so that 5 tasks are run. In production, i want to manage machines by grouping them into queues and for all machines having QUEUE_A -> do X and all machines having QUEUE_B -> do Y etc.
Thank you
Instead of having one worker work 2 queues, have each worker work one queue. So the worker commands should look like this:
airflow worker -q test_queue
airflow worker -q local_queue
Then have two of the same task, but in different queus.
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t3_2 = BashOperator(
task_id='print_host_2',
bash_command='hostname >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2_2 = BashOperator(
task_id='print_uptime_2',
bash_command='uptime >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)
t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)