Search code examples
pythonairflowpostgres-operator

How to pass value to Xcom in Airflow


last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}')}}".format(country,country) 

This is the value I am saving in my xcom

where last_updated_utc > '{}';

And using it in where clause

but the xcom is passed in correctly into my where clause

 where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm', 
 key='QueryTimeStamp_mm')}';

Its passing the entire String , how do I solved this?

.format(last_updated_at)

Thats how I am passing it in the where clause

When I was not using the

 my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date', 
 key='QueryTimeStamp')}}"

The Xcom worked fine.but when I am passing parameter, It doesn't anymore

Python Callable function pushing the xcom

def match_dates(**Kwargs):     
try:     
print("enters the try block")     
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")     
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)

PostGresOperator pulling the xcom

import_redshift_table_zm = PostgresOperator(
        task_id='copy_data_from_redshift_zm',
        postgres_conn_id='postgres_default',
        sql="""
                BEGIN;

            create table angaza_public_spark.stag_angaza_users_zm as
            Select * FROM angaza_public_zm.users
            where last_updated_utc > '{}';
              END;
                   
    """.format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))


Solution

  • Two things,

    1. Replace this
    {{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
     key='QueryTimeStamp_{}')}}
    

    with this (notice the space after {{ and before }}

    {{ ti.xcom_pull(task_ids='Match_Updated_date_{}', 
     key='QueryTimeStamp_{}') }}
    
    1. Depends on where you are using last_updated_at parameter. Each operator in airflow has a class variable named template_fields

    If you are using a custom operator, make sure you are adding last_updated_at as part of template_fields

    For ex:

    template_fields = ('templates_dict', 'op_args', 'op_kwargs')