I am creating a dag that should do the following:
The code below is my attempt to do what I want:
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
With Dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
):
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)
task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)
task_fetch_ids >> task_fetch_detail
The above clearly doesn't work because I am looping through a string. What is the correct syntax ?
you must adapt the xcom return to the args of the dynamic task mapping operator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
dag = DAG(
dag_id="docker_dag",
schedule_interval=None,
start_date=days_ago(1),
)
with dag:
def fn_get_work():
return ["a", "b", "c"]
get_work_task = PythonOperator(task_id="get_work",
python_callable=fn_get_work
)
def fn_build(work):
rst = []
for i in work:
rst.append(f"fetch-event --event-id {i}")
return rst
build_work_task = PythonOperator(task_id="build_work",
python_callable=fn_build,
op_kwargs={"work": get_work_task.output})
run_work_task = DockerOperator.partial(
task_id="run_work",
image="alpine:3.16.2",
).expand(command=build_work_task.output)
get_work_task >> build_work_task >> run_work_task