BACKGROUND
Airflow version = 2.3.4, Python version = 3.8.
I'm trying to develop a DAG in Google Cloud Composer that can instantiate "n" exact amount of tasks (DataflowOperator) based on the amount of files loaded into a Google Cloud Storage bucket.
APPROACH
I've tried to implement this by using Dynamic Tasks in Airflow. I'm using a function with a task decorator that executes a number of DataflowOperators based on the amount of elements in the list passed into this function.
ERROR
...
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1390, in xcom_push
context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)
KeyError: 'ti'
{taskinstance.py:1408} INFO - Marking task as FAILED.
...
CODE
...
with models.DAG(
DAG_NAME,
default_args=DEFAULT_ARGS,
schedule_interval=None,
start_date=datetime.now(),
catchup=False,
) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
@task
def dynamic_task_test(sample_list):
dataflow_operator = DataflowStartFlexTemplateOperator(
task_id = 'dataflow_task_id',
project_id = 'project_id',
location = 'location',
body = { 'some_parameters' : f'{sample_list}'},
)
dataflow_operator.execute(dict())
start >> dynamic_task_test.expand(sample_list=['A', 'B', 'C'])
SCENARIO
Whenever I try to execute the code above, the DAG is able to:
BUT throws an error (error code and message snippet above), therefore failing the tasks and the whole DAG. I'm trying to find out what can be causing the error above and ways how to resolve it.
QUESTIONS
The problem with @task
, you have to execute the operator inside the method and the call to execute
method causes an error.
I propose you 2 other approaches :
You can do a classical for loop
in the code and execute Dataflow
jobs in parallel, example :
import airflow
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
with airflow.DAG(
"dag_id", default_args={}, schedule_interval=None,
) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
sample_list = ['A', 'B', 'C']
for element in sample_list:
dataflow_operator = DataflowStartFlexTemplateOperator(
task_id=f'dataflow_task_id_{element}',
project_id='project_id',
location='location',
body={'some_parameters': element},
)
start >> dataflow_operator
It will execute each Dataflow
job in parallel and represent each Dataflow task in the DAG.
Use a task group that build the list of DataflowStartFlexTemplateOperator
.
It's similar to the solution 1 but more elegant in the DAG
graph.
It will represent a task group in the graph, that can be expandable. When we expand it, we see all the Dataflow tasks inside.
Example :
from typing import List
import airflow
from airflow.decorators import task_group
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
@task_group()
def launch_dataflow_operators(sample_list: List[str]) -> List[DataflowStartFlexTemplateOperator]:
return list(map(to_dataflow_operator, sample_list))
def to_dataflow_operator(element) -> DataflowStartFlexTemplateOperator:
return DataflowStartFlexTemplateOperator(
task_id=f'dataflow_task_id_{element}',
project_id='project_id',
location='location',
body={'some_parameters': element},
)
with airflow.DAG(
"dag_id", default_args={}, schedule_interval=None,
) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
sample_list = ['A', 'B', 'C']
start >> launch_dataflow_operators(sample_list)
@task_group
method returns the list of DataflowStartFlexTemplateOperator
DAG
: start >> launch_dataflow_operators(sample_list)
I didn't had the time to execute the second solution on my side but the idea is here.