Search code examples
pythonairflowairflow-2.xairflow-taskflow

Airflow: do not start tasks in DAG1, while DAG2 unfinished


I have DAG1 and DAG2.

There are tasks in DAG1: t11 >> t12. Tasks in DAG2: t21 >> t22.

The settings are such that only one task can be executed at a time.

I want tasks in each DAG to run only after the other DAG is finished. In practice, for example, t11 can start after t21. At this moment t12 queued.

How to achieve strict sequence of tasks within each DAG?


Solution

  • Updated Answer:

    I understand your situation now I think, x independent DAGs each with a chain of tasks that needs to be done before any other DAG starts. This is an interesting puzzle! I havent tested this but I think it could work with a combination of pools and priority weights.

    First, create a new Airflow pool with only 1 worker slot and assign all of your tasks to this pool, this makes sure that only 1 task ever runs at any given time.

    Next set the priority weights for the tasks in your DAG to increase with each sequential step:

    task_1 = YourOperator(task_id='task_1', pool=POOL_NAME, priority_weight=1)
    task_2 = YourOperator(task_id='task_2', pool=POOL_NAME, priority_weight=2)
    task_3 = YourOperator(task_id='task_3', pool=POOL_NAME, priority_weight=3)
    
    task_1 >> task_2 >> task_3 
    

    In theory this should ensure that once a chain is started it completes before any other "first tasks" in other DAGs run.


    Previous answer, keeping this for users who want a dependency between the DAGs:

    You can trigger one DAG after a specific task in another DAG has finished by using Datasets.

    for example lets say you want the following order: t11 >> t12 --DAG2 starts--> t21 >> t22

    You can do this by adding the following parameter to t12:

    from airflow.datasets import Dataset
    
    t12 = MyOperator(
        task_id="t12",
        ....
        outlets=[Dataset("x-my-dataset")]
    )
    

    and in DAG 2 you add the same Dataset as the schedule instead of a time-based schedule:

    from airflow.datasets import Dataset
    
    with DAG(
       dag_id="DAG2",
       ...
       schedule=[Dataset("x-my-dataset")]
    )
    

    Note that this only works in Airflow 2.4 and later, if you are on an earlier version then you can use the TriggerDagRunOperator.

    You can find more information on Datasets in this guide. Hope this helps!