Search code examples
airflowjinja2etl

In Airflow, how can I create a custom Operator that supports Jinja templating?


I'm trying to create a custom Airflow operator and I want one of its fields to support Jinja templating. I've read the documentation but it seems that there's still something that I don't get. Here's the code for my custom operator:

class UtilETLStartJobRunOperator(UtilETLStartRunBaseOperator):
    template_fields = ('util_etl_start_run_uuid',)

    @apply_defaults
    def __init__(
            self,
            redshift_schema,
            redshift_conn_id,
            tgt_job_name,
            util_etl_start_run_uuid,
            *args,
            **kwargs):
        
        super().__init__(
            redshift_schema=redshift_schema,
            redshift_conn_id=redshift_conn_id,
            *args,
            **kwargs
        )
        self.util_etl_start_run_uuid = util_etl_start_run_uuid

This class inherits from UtilETLStartRunBaseOperator which in turn inherits from BaseOperator and implements the execute method.

The problem I'm having is that when I try to use Jinja syntax to run a task created using my custom operator, I get an error message saying {taskinstance.py:1150} ERROR - 'ti' is undefined. Here's is how I'm defining my task:

start_task = UtilETLStartJobRunOperator(
    task_id='start_etl_job',
    tgt_job_name='etl_job',
    redshift_schema='my_schema',
    redshift_conn_id='my_conn_id',
    util_etl_start_run_uuid=f'{{{{ ti.xcom_pull(task_ids="{previous_task_id}") }}}}', # assume previous_task_id is defined and contains the task_id of some upstream task
    provide_context=True,
    dag=dag  # assume dag is defined
)

What should I do so my {{ }} Jinja expression renders correctly? I think it has something to do with the provide_context parameter. I'm setting it to True here, but it doesn't seem to make any difference. Can someone please point me in the right direction?


Solution

  • Turns out I was using the value of the templated field inside the operator's constructor and at that point, 'ti' isn't defined yet, hence the 'ti' is undefined error.