Search code examples
pythonairflowgoogle-cloud-composer

Passing return value from operator to following operator in Airflow


I'm trying to give a string list to the source_objects field for the GoogleCloudStorageToBigQueryOperator but with the following code I get an error:

string indices must be integers, not unicode

Things I don't know :

  • How do I get the return value of the get_file_name with an XCOM at the DAG scope ?
  • How do I call the xcom_pull function at the DAG scope without having to provide a context? It seemed to me that task instances do not need to provide a context.

Things I thought of :

  • Rewrite the operator and take an XCOM as argument

The thing I want to do :

  • I want to be able to call the operator

Also, it seems that some of the operator's fields use a feature called templated_field, what's the mechanism behind the template fields? Isn't just for PythonOperator and BashOperator?

And a last one, why does PythonOperator does not return a TaskInstance?

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        fileName = kwargs['dag_run'].conf['fileName']
        return [fileName]

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        #source_objects=['data.csv'], 
        source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'), 
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_EMPTY')

    bq_load.set_upstream(get_file_name)

I'm kind of new to Python and Airflow. These kinds of things are supposed to be trivial I guess. I'm sure there's something I misunderstood here.


Solution

  • After many tests, I came up with this solution, thanks to tobi6 for his comment which pointed me in the right direction. I had to use the template_fields feature.

    When I tried to return a list with a single string, I had concatenation errors, so I had to return a single string in my XCOM and the surround the template call to XCOM with brackets to make the result a list.

    Here's the final code :

    with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
    
        def get_file_name_from_conf(ds, **kwargs):
            return kwargs['dag_run'].conf['fileName']
    
        get_file_name = PythonOperator(
            task_id='get_file_name',
            provide_context=True,
            python_callable=get_file_name_from_conf)
    
        bq_load = GoogleCloudStorageToBigQueryOperator(
            task_id='bq_load', 
            bucket='src_bucket', 
            source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
            destination_project_dataset_table='project:dataset.table', 
            write_disposition='WRITE_APPEND')
    
        bq_load.set_upstream(get_file_name)