Search code examples
pythongoogle-cloud-platformairflowgoogle-cloud-composer

How to rerun Airflow task by clearing the state


I am trying to find solution for below use case.

I have DAG D1 with Tasks T1 >> T2 >> T3 , which reads a json file from GCS and prints it to console.

I want to rerun D1 from T2 onwards , meaning on restart T2 and T3 will run.

When I am doing this from UI by clearing the state of T2 task , it's working fine.

However, I am trying to achieve that thru script from another DAG D2 and for that I have written below python code.

The RERUN_TASK is supposed to clear the state of "show_bq_table" Task of "dag_task_to_rerun" DAG.

But it is giving error : The clear_task_instances method requires Session parameter

How to fix this ? Where to get this session ?

D2 :

def rerun_dag_task(dag_name, task_name):
    dag_runs = DagRun.find(dag_id=dag_name)
    dag_runs_sorted = sorted(dag_runs, key=lambda dr: dr.execution_date, reverse=True)
    dag_run = dag_runs_sorted[0]
    task_run = dag_run.get_task_instance(task_id=task_name)
    clear_task_instances(tis=[task_run], dag=dag_run, dag_run_state=True)


default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(1),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'schedule_interval': None
}

dag = DAG(
    dag_id="dag_to_call_rerun",
    default_args=default_args,
    schedule_interval=None,
    catchup=False
)

START = BashOperator(task_id="start", do_xcom_push=False, bash_command='echo "starting rerun"', dag=dag)
STOP = BashOperator(task_id="stop", do_xcom_push=False, bash_command='echo "stopping rerun"', dag=dag)

RERUN_TASK = PythonOperator(
    task_id="rerun_dag_task",
    provide_context=True,
    python_callable=rerun_dag_task,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    depends_on_past=False,
    op_kwargs={'dag_name': "dag_task_to_rerun", 'task_name': "show_bq_table"},
    dag=dag
)

START >> RERUN_TASK >> STOP

D1:

default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(1),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'schedule_interval': '@daily'
}

dag = DAG(
    dag_id="dag_task_to_rerun",
    default_args=default_args,
    catchup=False
)

config_path = str(Path(__file__).parent) + "/resources/demo_js.json"
config_data = read_config_json(config_path)

START = BashOperator(task_id="start", do_xcom_push=False, bash_command='echo "starting data fusion batch pipeline"', dag=dag)
STOP = BashOperator(task_id="stop", do_xcom_push=False, bash_command='echo "stopping data fusion batch pipeline"', trigger_rule=TriggerRule.NONE_SKIPPED, dag=dag)


SHOW_BQ_TABLE = PythonOperator(
    task_id="show_bq_table",
    provide_context=True,
    python_callable=show_bq_table,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    depends_on_past=False,
    do_xcom_push=False,
    op_kwargs={'config_data': config_data, 'bucket': "white-codex-353213-demo"},
    dag=dag)

START >> SHOW_BQ_TABLE >> STOP

Solution

  • To fix your problem you can try:

    from airflow.utils.db import create_session
    
    def rerun_dag_task(dag_name, task_name):
        dag_runs = DagRun.find(dag_id=dag_name)
        dag_runs_sorted = sorted(dag_runs, key=lambda dr: dr.execution_date, reverse=True)
        dag_run = dag_runs_sorted[0]
        task_run = dag_run.get_task_instance(task_id=task_name)
        with create_session() as session:
            clear_task_instances(tis=[task_run], session=session, dag=dag_run, dag_run_state=True)
    

    And here is another solution based on BashOperator and airflow cli:

    RERUN_TASK = BashOperator(
        task_id="rerun_dag_task",
        bash_command="LAST_START_DATE=$(airflow dags list-runs -d leboncoin.address.action-ingress-privacy_lakehouse -o plain | sed -n 2p | awk '{print $4}') && airflow tasks clear -s $LAST_START_DATE  -t show_bq_table -d -y dag_task_to_rerun"
        trigger_rule=TriggerRule.ALL_SUCCESS,
        depends_on_past=False,
        dag=dag
    )