Search code examples
pythonairflowjinja2

How to extract and manipulate the hour of an Airflow DAG execution date as a variable?


How can I extract the hour of an Airflow DAG execution date as a variable and perform manipulations on it (such as multiplication by a factor or subtracting from a value) while ensuring that the DAG can be rerun with a different execution date? I currently write

EXEC_HOUR = '{{ execution_date.strftime("%H") }}'

to extract the hour as a variable, but I cannot perform any logic with it.

so for example I can not do: NEW_VAR = EXEC_HOUR * -1


Solution

  • the way that i found to do it is to put the logic in a method use it in a PythonOperator like that:

        my_logic_task = PythonOperator(
            task_id='logic_task',
            provide_context=True,
            python_callable=logic_method,
            dag=dag
        )
    

    when the method is something like that:

    def logic_method(**kwargs):
        execution_date = kwargs['execution_date']
        return -execution_date.hour if execution_date.hour < 12 else 24 - execution_date.hour
    
    

    after that you can create a variable MY_ANSWER like that:

    MY_ANSWER = "{{ task_instance.xcom_pull(task_ids='logic_task') }}"
    

    and pass it in your KubernetesPodOperator arguments.

    Without using PythonOperator the argument it will look like this:

    [<function logic_method at 0xffff937ee830>]