I am new to using xcoms in Airflow.
Here's what my code looks like:
with DAG(default_args=default_args,
max_active_runs=1,
dag_id="my_dag",
catchup=False,
start_date=start_date,
render_template_as_native_obj=True,
schedule_interval=None) as my_dag:
init_task = GKEStartPodOperator(
task_id="init_task",
name="init_task",
retries=0,
do_xcom_push=True,
project_id=PROJECT_ID,
location=REGION,
cluster_name=CLUSTER,
image=image,
arguments=[],
in_cluster=False,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
namespace=NAMESPACE,
service_account_name=SA,
)
task_1 = PythonOperator(
task_id="task_1",
do_xcom_push=False,
op_kwargs={
'connector_key':"e_conns",
},
python_callable=lambda: do_task(),
)
task_2 = PythonOperator(
task_id="task_2",
do_xcom_push=False,
op_kwargs={
'connector_key':"f_conns",
},
python_callable=lambda: do_task(),
)
init_task >> task_1 >> task_2
def do_task(**kwargs) -> int:
ti = kwargs['ti']
connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]
#remainder removed (not relevant)
I can see that the xcom push from init_task
is successful, but the do_task
method is unable to pull xcom values. I get an error KeyError: 'ti'
on ti = kwargs['ti']
.
I've seen numerous ways to handle xcoms and I'm a little confused. I thought that Airflow would inject the task instance for the key ti
into kwargs
, but it doesn't seem like the task instance is ever getting to the method for the PythonOperator.
Is there a way to get the task instance into the method?
(Airflow is version 2.4.3 on GCP Composer)
Well, I feel silly and a little embarrassed. It seems I don't know Python as well as I thought I did.
I had declared my methods after the DAG, which was a mistake. Also, I had referenced the method as lambda, which wasn't necessary.
Code like this works fine:
def do_task(ti, **kwargs) -> int:
connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]
with DAG(default_args=default_args,
max_active_runs=1,
dag_id="my_dag",
catchup=False,
start_date=start_date,
render_template_as_native_obj=True,
schedule_interval=None) as my_dag:
init_task = GKEStartPodOperator(
task_id="init_task",
name="init_task",
retries=0,
do_xcom_push=True,
project_id=PROJECT_ID,
location=REGION,
cluster_name=CLUSTER,
image=image,
arguments=[],
in_cluster=False,
is_delete_operator_pod=True,
startup_timeout_seconds=300,
namespace=NAMESPACE,
service_account_name=SA,
)
task_1 = PythonOperator(
task_id="task_1",
do_xcom_push=False,
op_kwargs={
'connector_key':"e_conns",
},
python_callable=do_task,
)
task_2 = PythonOperator(
task_id="task_2",
do_xcom_push=False,
op_kwargs={
'connector_key':"f_conns",
},
python_callable=do_task,
)
init_task >> task_1 >> task_2