Search code examples
airflowairflow-taskflow

How to dispatch numerous xcoms from a task to another task?


I am brand new to Airflow and I am trying to understand data dispatch between tasks. The documentation for xcom (and a good article) helped me to understand the basics of a 1:1 data flow.

Consider now task_one that fetches an API that provides a JSON: a large list of small dicts (after deserializing). I would like to have task_two that somehow gets one of these dicts and processes it further. For performance/scaling reasons, I expect that 10 concurrent task_two would be perfect.

How should I engineer my DAG? Specifically, is there a way to

  • tell task_two "take the next xcom provided by task_one until there are none left"?
  • limit the number of concurrent task_two?

A generalization of this question would be:

  • should the logic in task_two account for the retrieval of xcom data in a loop until the data is depleted ...
  • ... OR is there a built-in, recommended way to setup the DAG so that task_two just retrieves one piece of data and is restarted by the DAG until the data provided by task_one is depleted? (in which case the DAG manages the startup of 10 task-two and the provision of data to each of them)

Solution

  • It seems like Dynamic Tasks Mapping is really the answer for you.

    Here is an example of how it works:

    @task
    def api_call():
        return [{"a": 1}, {"b": 2}, {"c": 3}]
    
    
    @task(
        max_active_tis_per_dag=2 # if you want to control the number of concurrent tasks
    )
    def handle_output(arg):
        print(arg)
    
    
    with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
        handle_output.expand(arg=api_call())
    

    Airflow will split the array from the api_call and will create 3 tasks with appropriate inputs.

    DAG's Graph View

    You can make sure, by looking at output:

    AIRFLOW_CTX_DAG_RUN_ID=manual__2023-09-17T22:13:18.891076+00:00
    [2023-09-17, 22:13:21 UTC] {logging_mixin.py:137} INFO - {'a': 1}