Search code examples
pythonairflowgoogle-cloud-composerairflow-xcom

Cannot access task instance to get xcom


I am new to using xcoms in Airflow.

Here's what my code looks like:

with DAG(default_args=default_args,
         max_active_runs=1,
         dag_id="my_dag",
         catchup=False,
         start_date=start_date,
         render_template_as_native_obj=True,
         schedule_interval=None) as my_dag:

    init_task = GKEStartPodOperator(
        task_id="init_task",
        name="init_task",
        retries=0,
        do_xcom_push=True,
        project_id=PROJECT_ID,
        location=REGION,
        cluster_name=CLUSTER,
        image=image,
        arguments=[],
        in_cluster=False,
        is_delete_operator_pod=True,
        startup_timeout_seconds=300,
        namespace=NAMESPACE,
        service_account_name=SA,
    )


    task_1 = PythonOperator(
        task_id="task_1",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"e_conns",
        },
        python_callable=lambda: do_task(),
    )

    task_2 = PythonOperator(
        task_id="task_2",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"f_conns",
        },
        python_callable=lambda: do_task(),
    )
    init_task >> task_1 >> task_2

def do_task(**kwargs) -> int:
    ti = kwargs['ti']   
    connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]
    #remainder removed (not relevant)

I can see that the xcom push from init_task is successful, but the do_task method is unable to pull xcom values. I get an error KeyError: 'ti' on ti = kwargs['ti'].

I've seen numerous ways to handle xcoms and I'm a little confused. I thought that Airflow would inject the task instance for the key ti into kwargs, but it doesn't seem like the task instance is ever getting to the method for the PythonOperator.

Is there a way to get the task instance into the method?

(Airflow is version 2.4.3 on GCP Composer)


Solution

  • Well, I feel silly and a little embarrassed. It seems I don't know Python as well as I thought I did.

    I had declared my methods after the DAG, which was a mistake. Also, I had referenced the method as lambda, which wasn't necessary.

    Code like this works fine:

    
    def do_task(ti, **kwargs) -> int:
        connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]
    
    
    with DAG(default_args=default_args,
             max_active_runs=1,
             dag_id="my_dag",
             catchup=False,
             start_date=start_date,
             render_template_as_native_obj=True,
             schedule_interval=None) as my_dag:
    
        init_task = GKEStartPodOperator(
            task_id="init_task",
            name="init_task",
            retries=0,
            do_xcom_push=True,
            project_id=PROJECT_ID,
            location=REGION,
            cluster_name=CLUSTER,
            image=image,
            arguments=[],
            in_cluster=False,
            is_delete_operator_pod=True,
            startup_timeout_seconds=300,
            namespace=NAMESPACE,
            service_account_name=SA,
        )
    
    
        task_1 = PythonOperator(
            task_id="task_1",
            do_xcom_push=False,
            op_kwargs={
                'connector_key':"e_conns",
            },
            python_callable=do_task,
        )
    
        task_2 = PythonOperator(
            task_id="task_2",
            do_xcom_push=False,
            op_kwargs={
                'connector_key':"f_conns",
            },
            python_callable=do_task,
        )
        init_task >> task_1 >> task_2