Search code examples
airflowmwaa

Airflow 2.2 TriggerDagRunOperator wait_for_completion behavior


I have a question about the TriggerDagRunOperator , specifically the wait_for_completion parameter.

Before moving to Airflow 2.2, we used this operator to trigger another DAG and a ExternalTaskSensor to wait for its completion.

In Airflow 2.2, there is a new parameter that is called wait_for_completion that if sets to True, will make the task complete only when the triggered DAG completed.

This is great, but I was wondering about wether the worker will be released between pokes or not. I know that the ExternalTaskSensor used to have a parameter reschedule that you can use for pokes larger than 1m which will release the worker slot between pokes - but I don’t see it in the documentation anymore.

My question is if the wait_for_completion parameter causes the operator to release the worker between pokes or not? From looking at the code I don’t think that is the case, so I just want to verify.

enter image description here

If it isn’t releasing the worker and the triggered DAG is bound to take more than 1m to finish, what should be the best approach here?

We are using MWAA Airflow 2.2 so I guess deferred operators are not an option (if it is a solution in this case)


Solution

  • For Apache-Airflow>=2.6.0:

    Assuming you have Triggerer process running you can use Deferrable Operators. Thus the operator will defer (no occupy worker slot) when needed - according to the logic of the specific operator. To set it up simply use:

    TriggerDagRunOperator(..., deferrable=True)
    

    For Apache-Airflow<2.6.0:

    When using wait_for_completion=True in TriggerDagRunOperator the worker will not be released as long as the operator is running. You can see that in the operator implementation. The operator use time.sleep(self.poke_interval)

    As you pointed there are two ways to achieve the goal of verifying the triggered dag completed:

    1. DAG A Using TriggerDagRunOperator followed by ExternalTaskSensor
    2. Using TriggerDagRunOperator with wait_for_completion=True

    However other than resources issue which you mentioned the two options are not really equivalent.

    In option 1 if the triggered DAG fails then the ExternalTaskSensor will fail.

    In option 2 consider:

    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    my_op = TriggerDagRunOperator (
        task_id='task',
        trigger_dag_id="dag_b",
        ...,
        wait_for_completion=True,
        retries=2
    )
    

    if the dag_b fails then TriggerDagRunOperator will retry which will invoke another DagRun of dag_b.

    Both options are valid. You need to decide which behavior suitable for your use case.