I am brand new to Airflow and I am trying to understand data dispatch between tasks. The documentation for xcom (and a good article) helped me to understand the basics of a 1:1 data flow.
Consider now task_one
that fetches an API that provides a JSON: a large list of small dicts (after deserializing). I would like to have task_two
that somehow gets one of these dicts and processes it further. For performance/scaling reasons, I expect that 10 concurrent task_two
would be perfect.
How should I engineer my DAG? Specifically, is there a way to
task_two
"take the next xcom provided by task_one
until there are none left"?task_two
?A generalization of this question would be:
task_two
account for the retrieval of xcom data in a loop until the data is depleted ...task_two
just retrieves one piece of data and is restarted by the DAG until the data provided by task_one
is depleted? (in which case the DAG manages the startup of 10 task-two
and the provision of data to each of them)It seems like Dynamic Tasks Mapping is really the answer for you.
Here is an example of how it works:
@task
def api_call():
return [{"a": 1}, {"b": 2}, {"c": 3}]
@task(
max_active_tis_per_dag=2 # if you want to control the number of concurrent tasks
)
def handle_output(arg):
print(arg)
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
handle_output.expand(arg=api_call())
Airflow will split the array from the api_call
and will create 3 tasks with appropriate inputs.
You can make sure, by looking at output:
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-09-17T22:13:18.891076+00:00
[2023-09-17, 22:13:21 UTC] {logging_mixin.py:137} INFO - {'a': 1}