Search code examples
airflowairflow-scheduler

Airflow Use execution date for logic within DAG


Is it possible to access the execution date within the DAG outside of an Operator? I need to use the execution date for conditional logic? If the execution date is the 1st of the month then I need build a dictionary with certain values, else if its not the 1st of the month I need to build the dictionary with other values. The dictionary is a parameter to the SnowflakeOperator.

I pulled the idea when reading a guide on Astronomer's website.

default_args = {
    "owner": "<my_name>",
    "depends_on_past": False,
    "start_date": datetime(2020, 3, 2),
    "email": ['<my_email>'],
    "email_on_failure": True,   
    "retries": 0
}


dag = DAG(
    dag_id="ETL Dag",
    default_args=default_args,
    schedule_interval="0 18 * * *",
    catchup=True,
    max_active_runs=1
)

with dag:
    ymd = '{{ ds_nodash }}'
    # Use ymd to determine if 1st or not
    ...

Solution

  • My practice is to use a operator to process the date and return. Then use xcom_pull in another operator to get the return value.

    In your case, for example:

    @task
    def parse_dates(**kwargs):
        # read logical_date
        pt = kwargs['logical_date']
        # your process logic here
        res = process_func(pt)
        return res
    

    In another task where to use res (result from task parse_dates)

    @task
    def one_task(**kwargs):
        _res = kwargs['ti'].xcom_pull(task_ids='parse_dates')
        # do something you need to depend on _res