I'm trying to create a custom Airflow operator and I want one of its fields to support Jinja templating. I've read the documentation but it seems that there's still something that I don't get. Here's the code for my custom operator:
class UtilETLStartJobRunOperator(UtilETLStartRunBaseOperator):
template_fields = ('util_etl_start_run_uuid',)
@apply_defaults
def __init__(
self,
redshift_schema,
redshift_conn_id,
tgt_job_name,
util_etl_start_run_uuid,
*args,
**kwargs):
super().__init__(
redshift_schema=redshift_schema,
redshift_conn_id=redshift_conn_id,
*args,
**kwargs
)
self.util_etl_start_run_uuid = util_etl_start_run_uuid
This class inherits from UtilETLStartRunBaseOperator
which in turn inherits from BaseOperator
and implements the execute
method.
The problem I'm having is that when I try to use Jinja syntax to run a task created using my custom operator, I get an error message saying {taskinstance.py:1150} ERROR - 'ti' is undefined
.
Here's is how I'm defining my task:
start_task = UtilETLStartJobRunOperator(
task_id='start_etl_job',
tgt_job_name='etl_job',
redshift_schema='my_schema',
redshift_conn_id='my_conn_id',
util_etl_start_run_uuid=f'{{{{ ti.xcom_pull(task_ids="{previous_task_id}") }}}}', # assume previous_task_id is defined and contains the task_id of some upstream task
provide_context=True,
dag=dag # assume dag is defined
)
What should I do so my {{ }}
Jinja expression renders correctly? I think it has something to do with the provide_context
parameter. I'm setting it to True
here, but it doesn't seem to make any difference. Can someone please point me in the right direction?
Turns out I was using the value of the templated field inside the operator's constructor and at that point, 'ti' isn't defined yet, hence the 'ti' is undefined
error.