In my parent dag, I have a child dag started with the TriggerDagRunOperator
. That child dag goes ahead and does its own thing.
my_trigger = TriggerDagRunOperator(
task_id = "my_trigger",
trigger_dag_id = "child_dag",
wait_for_completion = True,
execution_date="{{ logical_date }}",
)
However, somewhere down the line in my parent dag, something fails and I just want to stop everything. There's no point in the child dag still running if something failed in the parent dag.
I can set the state of the my_trigger
task to State.FAILED
and that will fail the task, but it won't stop the child dag from continuing to run until it completes.
So my question is how do you do stop the dag started by the TriggerDagRunOperator
? I looked around members of my_trigger
but I couldn't really find anything that would let me stop the dag. Does anyone have any ideas?
So it turns out you cannot use the TriggerDagRunOperator
to stop the dag it started.
This is not even how it works internally in Airflow. In order to stop a dag, you must stop all its tasks. Setting a dag to a failed state will not work!
Therefore, the solution is to stop all of a dag's tasks. You can use DagRun.find
to find the dag you are concerned with:
dag_run = DagRun.find(
dag_id = my_trigger.trigger_dag_id,
execution_date = kwargs["logical_date"],
)[0]
Then, stop all its tasks in order to stop the dag:
for task_instance in dag_run.get_task_instances():
if task_instance.current_state() in (
State.RUNNING, # there may be more states to check ¯\_(ツ)_/¯
State.SCHEDULED,
State.UP_FOR_RESCHEDULE,
):
task_instance.set_state(State.FAILED)