Search code examples
pythondatetimeairflowpython-datetimedirected-acyclic-graphs

Datetime passed to a variable is treated as a string - Python


I am working on Airflow DAGs. I have two DAGs say DAG A and DAG B. I want to do some stuff in DAG B based on execution_date of DAG A. For this I am making use of Airflow variables.

In DAG A:

def set_execution_date(**kwargs):
    Variable.set('var_name',kwargs['execution_date'])
    status = Variable.get(time_status)

This updates my airflow variable with execution_date of DAG A like below:

enter image description here

Now I am using this value from airflow variable in DAG B:

def check_task_status(**kwargs):
    date= Variable.get('stream_execution_date')
    ti = get_task_instance('STREAMING_TEST', 'start_group', date)

My DAG B throws the below error for the above definition:

enter image description here

The datetime value stored in airflow variable is treated as a string when used in DAG B. Instead I would like to use it as a datetime value.

Any help is appreciated.


Solution

  • Airflow always stores a string when using Variable.set(). Depending on your needs you can get more precision, but an easy solution will be to transform execution_date datetime into isoformat and then convert it back to datetime when getting the value. Code will look like this:

    # Import section
    from datetime import datetime
    
    # Functions section
    def set_execution_date(**kwargs):
        Variable.set('stream_execution_date', kwargs['execution_date'].isoformat())
        status = Variable.get(time_status)
    
    def check_task_status(**kwargs):
        date = datetime.fromisoformat(Variable.get('stream_execution_date'))
        ti = get_task_instance('STREAMING_TEST', 'start_group', date)