Search code examples

How to return Object after evaluating the Templates in Airflow?

We are designing a variable selection and parameter setter logic what need to be evaluated when the DAG is triggered. Our DAGs are generated before the execution. We've decided to modify our static code into custom macros.

Until this time there was a code which was defined in between the operator definitions, therefore it was running when the DAG was generated by the DAG generator code. This code couldn't handle runtime arguments for selecting the proper Airflow variables.

for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
    dag_id = "TableLoader_" + str(table_name)
    schedule = None
    globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):

    with DAG(
        user_defined_macros={ "load_type_handler": load_type_handler }
    ) as dag:

        # static python code which sets pipeline_rt_args for all generated DAGs the same way
        # this static code could set only one type (INITIAL or INCREMENTAL)
        # but we want to decide during the execution now

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(

            # ---> Can't handle runtime parameters <---
            # ...

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

Now we want to pass the load_type (e.g.: INITIAL ,INCREMENTAL) while we trigger the DAG from the UI or REST API, therefore we need to modify this old (static) behavior (which handles only one case, but not both) to get the proper Airflow Variables and create the proper object for our CloudDataFusionStartPipelineOperator:


# or

But if we do something like:

def create_dag(dag_id, schedule, default_dag_args):

    def extend_runtime_args(prefix, param, field_name, date, job_id):

        # reading the Trigger-time parameter
        load_type = param.conf["load_type"]
        # getting the proper Airflow Variable (depending on current load type)
        result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]

        # setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
        # ...

        return rt_args

    with DAG( #...
            "extend_runtime_args": extend_runtime_args
        }) as dag:
        # removed static code (which executes only in generation time)

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(

            # ---> handles runtime arguments with custom macro <---
            runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
            # ...

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag


What we need here is a "future" evaluation of custom logic (not evaluated in DAG generating time) which will return with an object, that's why we use templates here.

We experience the followings:

  • inside the custom macro function extend_runtime_args the return type is an object
  • after the evaluation of the Jinja template the return type changes to string
  • the CloudDataFusionStartPipelineOperator fails because the runtime_args property is a string and not an object


  • How could we return with an object after evaluating the Jinja template (and do this in the 'future')?
    • Can we convert the string somehow?
  • How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?
  • Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?


  • How could we return with an object after evaluating the Jinja template (and do this in the 'future')?

    You can create your own custom operator deriving from CloudDataFusionStartPipelineOperator and make it accept string and convert it to the object required by the CloudDataFusionStartPipelineOperator and use this new operator. The "runtime_args" is a Dictionary so it should be as easy as json.loads() to get it back I believe.

    Can we convert the string somehow?

    Yep. Just the above json.loads() code should do. Also, if you just have a few parameters in runtime_args that change it could be easier to have more than one macro and directly return the changed values in multiple JINJA strings in dictionary. Something like:

    runtime_args = {
       'PREFIX' = "{{ dag_run }}",
       'date' = "{{ macros.ds_format(....) }}",

    Airflow processes basic structures like dicts or lists recursively when there is a templated field, so you can keep object structure, and use jinja macros as values (actually you can also have jinja macros as keys etc).

    How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?

    The JINJA templates are only evaluated when tasks are executed. So you are fine here.

    Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?

    Very good pattern. That's what they are for.