I am trying to find solution for below use case.
I have DAG D1 with Tasks T1 >> T2 >> T3 , which reads a json file from GCS and prints it to console.
I want to rerun D1 from T2 onwards , meaning on restart T2 and T3 will run.
When I am doing this from UI by clearing the state of T2 task , it's working fine.
However, I am trying to achieve that thru script from another DAG D2 and for that I have written below python code.
The RERUN_TASK is supposed to clear the state of "show_bq_table" Task of "dag_task_to_rerun" DAG.
But it is giving error : The clear_task_instances method requires Session parameter
How to fix this ? Where to get this session ?
D2 :
def rerun_dag_task(dag_name, task_name):
dag_runs = DagRun.find(dag_id=dag_name)
dag_runs_sorted = sorted(dag_runs, key=lambda dr: dr.execution_date, reverse=True)
dag_run = dag_runs_sorted[0]
task_run = dag_run.get_task_instance(task_id=task_name)
clear_task_instances(tis=[task_run], dag=dag_run, dag_run_state=True)
default_args = {
'owner': 'Airflow',
'start_date': days_ago(1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'schedule_interval': None
}
dag = DAG(
dag_id="dag_to_call_rerun",
default_args=default_args,
schedule_interval=None,
catchup=False
)
START = BashOperator(task_id="start", do_xcom_push=False, bash_command='echo "starting rerun"', dag=dag)
STOP = BashOperator(task_id="stop", do_xcom_push=False, bash_command='echo "stopping rerun"', dag=dag)
RERUN_TASK = PythonOperator(
task_id="rerun_dag_task",
provide_context=True,
python_callable=rerun_dag_task,
trigger_rule=TriggerRule.ALL_SUCCESS,
depends_on_past=False,
op_kwargs={'dag_name': "dag_task_to_rerun", 'task_name': "show_bq_table"},
dag=dag
)
START >> RERUN_TASK >> STOP
D1:
default_args = {
'owner': 'Airflow',
'start_date': days_ago(1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'schedule_interval': '@daily'
}
dag = DAG(
dag_id="dag_task_to_rerun",
default_args=default_args,
catchup=False
)
config_path = str(Path(__file__).parent) + "/resources/demo_js.json"
config_data = read_config_json(config_path)
START = BashOperator(task_id="start", do_xcom_push=False, bash_command='echo "starting data fusion batch pipeline"', dag=dag)
STOP = BashOperator(task_id="stop", do_xcom_push=False, bash_command='echo "stopping data fusion batch pipeline"', trigger_rule=TriggerRule.NONE_SKIPPED, dag=dag)
SHOW_BQ_TABLE = PythonOperator(
task_id="show_bq_table",
provide_context=True,
python_callable=show_bq_table,
trigger_rule=TriggerRule.ALL_SUCCESS,
depends_on_past=False,
do_xcom_push=False,
op_kwargs={'config_data': config_data, 'bucket': "white-codex-353213-demo"},
dag=dag)
START >> SHOW_BQ_TABLE >> STOP
To fix your problem you can try:
from airflow.utils.db import create_session
def rerun_dag_task(dag_name, task_name):
dag_runs = DagRun.find(dag_id=dag_name)
dag_runs_sorted = sorted(dag_runs, key=lambda dr: dr.execution_date, reverse=True)
dag_run = dag_runs_sorted[0]
task_run = dag_run.get_task_instance(task_id=task_name)
with create_session() as session:
clear_task_instances(tis=[task_run], session=session, dag=dag_run, dag_run_state=True)
And here is another solution based on BashOperator
and airflow cli:
RERUN_TASK = BashOperator(
task_id="rerun_dag_task",
bash_command="LAST_START_DATE=$(airflow dags list-runs -d leboncoin.address.action-ingress-privacy_lakehouse -o plain | sed -n 2p | awk '{print $4}') && airflow tasks clear -s $LAST_START_DATE -t show_bq_table -d -y dag_task_to_rerun"
trigger_rule=TriggerRule.ALL_SUCCESS,
depends_on_past=False,
dag=dag
)