last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}".format(country,country)
This is the value I am saving in my xcom
where last_updated_utc > '{}';
And using it in where clause
but the xcom is passed in correctly into my where clause
where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm',
key='QueryTimeStamp_mm')}';
Its passing the entire String , how do I solved this?
.format(last_updated_at)
Thats how I am passing it in the where clause
When I was not using the
my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date',
key='QueryTimeStamp')}}"
The Xcom worked fine.but when I am passing parameter, It doesn't anymore
Python Callable function pushing the xcom
def match_dates(**Kwargs):
try:
print("enters the try block")
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)
PostGresOperator pulling the xcom
import_redshift_table_zm = PostgresOperator(
task_id='copy_data_from_redshift_zm',
postgres_conn_id='postgres_default',
sql="""
BEGIN;
create table angaza_public_spark.stag_angaza_users_zm as
Select * FROM angaza_public_zm.users
where last_updated_utc > '{}';
END;
""".format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))
Two things,
{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}
with this (notice the space after {{
and before }}
{{ ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}') }}
last_updated_at
parameter. Each operator in airflow has a class variable named template_fields
If you are using a custom operator, make sure you are adding last_updated_at
as part of template_fields
For ex:
template_fields = ('templates_dict', 'op_args', 'op_kwargs')