Search code examples
airflowgoogle-cloud-composer

how to pass default values for run time input variable in airflow for scheduled execution


I come across one issue while running DAG in airflow. my code is working in two scenarios where is failing for one. below are my scenarios,

  1. Manual trigger with input - Running Fine
  2. Manual trigger without input - Running Fine
  3. Scheduled Run - Failing

Below is my code:

def decide_the_flow(**kwargs):
    cleanup=kwargs['dag_run'].conf.get('cleanup','N')
    print("IP is :",cleanup)
    return cleanup

I am getting below error,

    cleanup=kwargs['dag_run'].conf.get('cleanup','N')
AttributeError: 'NoneType' object has no attribute 'get'

I tried to define default variables like,

default_dag_args = {
    'start_date':days_ago(0),
    'params': {
        "cleanup": "N"
    },
    'retries': 0
}

but it wont work. I am using BranchPythonOperator to call this function. Scheduling : enter image description here Can anyone please guide me here. what I am missing ?

For workaround i am using below code,

try:
    cleanup=kwargs['dag_run'].conf.get('cleanup','N')
except:
    cleanup="N"

Solution

  • You can access the parameters from the context dict params, because airflow defines the default values on this dict after copying the dict dag_run.conf and checking if there is something missing:

    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.empty import EmptyOperator
    from airflow.operators.python import BranchPythonOperator
    
    
    def decide_the_flow(**kwargs):
        cleanup = kwargs['params']["cleanup"]
        print(f"IP is : {cleanup}")
        return cleanup
    
    
    with DAG(
        dag_id='airflow_params',
        start_date=datetime(2022, 8, 25),
        schedule_interval="* * * * *",
        params={
            "cleanup": "N",
        },
        catchup=False
    ) as dag:
        branch_task = BranchPythonOperator(
            task_id='test_param',
            python_callable=decide_the_flow
        )
        task_n = EmptyOperator(task_id="N")
        task_m = EmptyOperator(task_id="M")
    
        branch_task >> [task_n, task_m]
    

    I just tested it in scheduled and manual (with and without conf) runs, it works fine.