How can I extract the hour of an Airflow DAG execution date as a variable and perform manipulations on it (such as multiplication by a factor or subtracting from a value) while ensuring that the DAG can be rerun with a different execution date? I currently write
EXEC_HOUR = '{{ execution_date.strftime("%H") }}'
to extract the hour as a variable, but I cannot perform any logic with it.
so for example I can not do:
NEW_VAR = EXEC_HOUR * -1
the way that i found to do it is to put the logic in a method use it in a PythonOperator like that:
my_logic_task = PythonOperator(
task_id='logic_task',
provide_context=True,
python_callable=logic_method,
dag=dag
)
when the method is something like that:
def logic_method(**kwargs):
execution_date = kwargs['execution_date']
return -execution_date.hour if execution_date.hour < 12 else 24 - execution_date.hour
after that you can create a variable MY_ANSWER
like that:
MY_ANSWER = "{{ task_instance.xcom_pull(task_ids='logic_task') }}"
and pass it in your KubernetesPodOperator arguments.
Without using PythonOperator the argument it will look like this:
[<function logic_method at 0xffff937ee830>]