Search code examples
airflowairflow-scheduler

Access dag_run.conf in a custom pythonoperator on Airflow


I extended the existing PythonOperator on Airflow as follow:

class myPythonOperator(PythonOperator):
    def __init__(self,**kwargs) -> None:
        self.name = kwargs.get("name", "name is not provided")      

    def execute(self, context,**kwargs):
        print(self.name)
        super(myPythonOperator, self).execute(context)

And my task was defined as:

def task1(**kwargs):
    name = kwargs.get("name", "name is not provided")
    print(name)

And with the following DAG:

    myTask = myPythonOperator(
    task_id='myTask',
    python_callable = task1,
    op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
    provide_context=True
)

When triggering the DAG, I provided a configuration JSON from Airflow web UI, which is {"name":"foo"}

But the problem is that the name specified in JSON can only be access from task1, in ececute() it will always print name is not provided

Does anyone know the trick to access this dag_run.conf from the __init__() function of the operator?

Any help will be appreciated. Thanks


Solution

  • The way to access dag.run_config from a inherited class is by using template_field in Airflow, which can be found here