I am running an Airflow instance hosted on kubernetes. My use case needs multiple trigger of my ETL dag from master dag. Here the locations for which the runs of ETL dag that have to be made are decided in one of the tasks of the master dag itself. Now to achieve this dynamic flow i am using the PythonOperator in master dag to loop throught paths for which ETL dag has to be triggered and doing post call to trigger dag(is there a better way to do this?). Now since the pipeline inside ETL dag has to run one after the other, I want the runs of ETL dags to be queued and ran once the previous run has completed. For this i am trying to use max_active_runs param of dag to try and queue the dag runs of ETL dag. Reference taken from here. But when i trigger multiple runs of ETL dag it still doesn't queues the dags runs and keep them in running state and they get executed as soon the as first execution finishes.
Can anyone provide any alternative solution or fix to the above problem.
So to solve this issue beside defining max_active_runs=1 in dag config (not that is helped), I took the following steps: Firstly defined a task using PythonOperator in the master DAG, which in turn used TriggerDagRunOperator to trigger n runs of ETL DAG, here I tweaked the allowed states to have failure state as well, so that I can leave the failure scenario to be handled in ETL DAG. (If you want rest of the runs to not be executed you can remove failure state and the loop will break and the task failed.)
def trigger_n_runs(**kwargs):
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.state import State
run = n //logic to find number of runs
for n in range(run):
triggersplitrun = TriggerDagRunOperator(
task_id="n_runs_trigger",
trigger_dag_id=kwargs["trigger_dag_name"],
conf={},
poke_interval=60,
wait_for_completion=True,
allowed_states=[State.SUCCESS, State.FAILED],
do_xcom_push=True
)
trigger_state = triggersplitrun.execute(kwargs)
The tasks will be executed only after previous instance of run is completed, since 'wait_for_completion' is set to True.
Secondly, I defined a task using BranchPythonOperator in the beginning of the ETL DAG to check if the last run was successful and control the flow accordingly. (The dag_ref is the dag context name 'with DAG () as dag:')
LastRunStatus = BranchPythonOperator(
task_id="LastRunStatus",
python_callable=last_dag_run,
op_kwargs={
"task_id_on_success": "SuccessJob",
"task_id_on_failure": "FailureJob",
"dag_ref": dag
},
provide_context=True
)
The callable looked something like this:
def last_dag_run(**kwargs):
this_dag_run = kwargs["dag_ref"].get_last_dagrun(include_externally_triggered=True)
previous_dag_run = this_dag_run.get_previous_dagrun()
if previous_dag_run is None:
return kwargs["task_id_on_success"]
else:
previous_run_status = previous_dag_run.state
if previous_run_status == "success":
return kwargs["task_id_on_success"]
else:
return kwargs["task_id_on_failure"]