I am working on Airflow DAGs. I have two DAGs say DAG A and DAG B. I want to do some stuff in DAG B based on execution_date of DAG A. For this I am making use of Airflow variables.
In DAG A:
def set_execution_date(**kwargs):
Variable.set('var_name',kwargs['execution_date'])
status = Variable.get(time_status)
This updates my airflow variable with execution_date of DAG A like below:
Now I am using this value from airflow variable in DAG B:
def check_task_status(**kwargs):
date= Variable.get('stream_execution_date')
ti = get_task_instance('STREAMING_TEST', 'start_group', date)
My DAG B throws the below error for the above definition:
The datetime value stored in airflow variable is treated as a string when used in DAG B. Instead I would like to use it as a datetime value.
Any help is appreciated.
Airflow always stores a string when using Variable.set()
. Depending on your needs you can get more precision, but an easy solution will be to transform execution_date
datetime into isoformat and then convert it back to datetime when getting the value. Code will look like this:
# Import section
from datetime import datetime
# Functions section
def set_execution_date(**kwargs):
Variable.set('stream_execution_date', kwargs['execution_date'].isoformat())
status = Variable.get(time_status)
def check_task_status(**kwargs):
date = datetime.fromisoformat(Variable.get('stream_execution_date'))
ti = get_task_instance('STREAMING_TEST', 'start_group', date)