Search code examples
pythondynamicdirected-acyclic-graphsgoogle-cloud-composerairflow-2.x

How to pass "context" into the .execute() method of an Operator? (KeyError: 'ti')


BACKGROUND
Airflow version = 2.3.4, Python version = 3.8.
I'm trying to develop a DAG in Google Cloud Composer that can instantiate "n" exact amount of tasks (DataflowOperator) based on the amount of files loaded into a Google Cloud Storage bucket.

APPROACH
I've tried to implement this by using Dynamic Tasks in Airflow. I'm using a function with a task decorator that executes a number of DataflowOperators based on the amount of elements in the list passed into this function.

ERROR

...
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1390, in xcom_push
    context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)
KeyError: 'ti'
{taskinstance.py:1408} INFO - Marking task as FAILED.
...

CODE

...
with models.DAG(
    DAG_NAME,
    default_args=DEFAULT_ARGS,
    schedule_interval=None,
    start_date=datetime.now(),
    catchup=False,
) as dag:
    start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')

    @task
    def dynamic_task_test(sample_list):
        dataflow_operator = DataflowStartFlexTemplateOperator(
            task_id = 'dataflow_task_id',
            project_id = 'project_id',
            location = 'location',
            body = { 'some_parameters' : f'{sample_list}'},
        )
        dataflow_operator.execute(dict())

    start >> dynamic_task_test.expand(sample_list=['A', 'B', 'C'])

SCENARIO
Whenever I try to execute the code above, the DAG is able to:

  • create the right amount of mapped instances (based on the length of the list)
  • queue, start, and run the right amount of Dataflow jobs

BUT throws an error (error code and message snippet above), therefore failing the tasks and the whole DAG. I'm trying to find out what can be causing the error above and ways how to resolve it.

QUESTIONS

  1. What do the error code and message mean?
  2. What must I do correctly to resolve the issue?
  3. How should I properly pass the "context" into the .execute() method of the Operator?
  4. Do you have any advice or idea on how I can do it differently?

Solution

  • The problem with @task, you have to execute the operator inside the method and the call to execute method causes an error.

    I propose you 2 other approaches :

    Solution 1 :

    You can do a classical for loop in the code and execute Dataflow jobs in parallel, example :

    import airflow
    from airflow.operators import dummy
    from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
    
    with airflow.DAG(
            "dag_id", default_args={}, schedule_interval=None,
    ) as dag:
        start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
    
        sample_list = ['A', 'B', 'C']
    
        for element in sample_list:
            dataflow_operator = DataflowStartFlexTemplateOperator(
                task_id=f'dataflow_task_id_{element}',
                project_id='project_id',
                location='location',
                body={'some_parameters': element},
            )
    
            start >> dataflow_operator
    

    It will execute each Dataflow job in parallel and represent each Dataflow task in the DAG.

    Solution 2 :

    Use a task group that build the list of DataflowStartFlexTemplateOperator. It's similar to the solution 1 but more elegant in the DAG graph.

    It will represent a task group in the graph, that can be expandable. When we expand it, we see all the Dataflow tasks inside.

    Example :

    from typing import List
    
    import airflow
    from airflow.decorators import task_group
    from airflow.operators import dummy
    from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
    
    
    @task_group()
    def launch_dataflow_operators(sample_list: List[str]) -> List[DataflowStartFlexTemplateOperator]:
        return list(map(to_dataflow_operator, sample_list))
    
    
    def to_dataflow_operator(element) -> DataflowStartFlexTemplateOperator:
        return DataflowStartFlexTemplateOperator(
            task_id=f'dataflow_task_id_{element}',
            project_id='project_id',
            location='location',
            body={'some_parameters': element},
        )
    
    
    with airflow.DAG(
            "dag_id", default_args={}, schedule_interval=None,
    ) as dag:
        start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
    
        sample_list = ['A', 'B', 'C']
    
        start >> launch_dataflow_operators(sample_list)
    
    • The @task_group method returns the list of DataflowStartFlexTemplateOperator
    • We can then execute it as an usual operator in the DAG : start >> launch_dataflow_operators(sample_list)

    I didn't had the time to execute the second solution on my side but the idea is here.