Search code examples
pythondockerairflowairflow-taskflowairflow-xcom

[Airflow]: Dynamic Task Mapping on DockerOperator using Xcoms


I am creating a dag that should do the following:

  • fetch event ids
  • for each event id, fetch event details ( DockerOperator )

The code below is my attempt to do what I want:

from datetime import datetime

from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator



With Dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
):
    task_fetch_ids = PythonOperator(
        task_id="fetch_detail",
        ...)


    task_fetch_detail = DockerOperator(
        task_id="fetch_detail",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
        )


    task_fetch_ids >> task_fetch_detail


The above clearly doesn't work because I am looping through a string. What is the correct syntax ?


Solution

  • you must adapt the xcom return to the args of the dynamic task mapping operator

    
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    from airflow import DAG
    from airflow.providers.docker.operators.docker import DockerOperator
    
    dag = DAG(
        dag_id="docker_dag",
        schedule_interval=None,
        start_date=days_ago(1),
    )
    with dag:
        def fn_get_work():
            return ["a", "b", "c"]
    
    
        get_work_task = PythonOperator(task_id="get_work",
                                       python_callable=fn_get_work
                                       )
    
    
        def fn_build(work):
            rst = []
            for i in work:
                rst.append(f"fetch-event --event-id {i}")
            return rst
    
    
        build_work_task = PythonOperator(task_id="build_work",
                                         python_callable=fn_build,
                                         op_kwargs={"work": get_work_task.output})
    
        run_work_task = DockerOperator.partial(
            task_id="run_work",
            image="alpine:3.16.2",
        ).expand(command=build_work_task.output)
    
        get_work_task >> build_work_task >> run_work_task