We are designing a variable selection and parameter setter logic what need to be evaluated when the DAG is triggered. Our DAGs are generated before the execution. We've decided to modify our static code into custom macros.
Until this time there was a code which was defined in between the operator definitions, therefore it was running when the DAG was generated by the DAG generator code. This code couldn't handle runtime arguments for selecting the proper Airflow variables.
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):
with DAG(
default_args=default_dag_args,
dag_id=dag_id,
schedule_interval=schedule,
user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution now
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
Now we want to pass the load_type
(e.g.: INITIAL
,INCREMENTAL
) while we trigger the DAG from the UI or REST API, therefore we need to modify this old (static) behavior (which handles only one case, but not both) to get the proper Airflow Variables and create the proper object for our CloudDataFusionStartPipelineOperator
:
e.g.:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
But if we do something like:
def create_dag(dag_id, schedule, default_dag_args):
def extend_runtime_args(prefix, param, field_name, date, job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
Notice:
What we need here is a "future" evaluation of custom logic (not evaluated in DAG generating time) which will return with an object, that's why we use templates here.
We experience the followings:
extend_runtime_args
the return type is an objectCloudDataFusionStartPipelineOperator
fails because the runtime_args
property is a string and not an objectQuestions:
How could we return with an object after evaluating the Jinja template (and do this in the 'future')?
You can create your own custom operator deriving from CloudDataFusionStartPipelineOperator
and make it accept string and convert it to the object required by the CloudDataFusionStartPipelineOperator
and use this new operator. The "runtime_args" is a Dictionary so it should be as easy as json.loads()
to get it back I believe.
Can we convert the string somehow?
Yep. Just the above json.loads() code should do. Also, if you just have a few parameters in runtime_args that change it could be easier to have more than one macro and directly return the changed values in multiple JINJA strings in dictionary. Something like:
runtime_args = {
'PREFIX' = "{{ dag_run }}",
'date' = "{{ macros.ds_format(....) }}",
}
Airflow processes basic structures like dicts or lists recursively when there is a templated field, so you can keep object structure, and use jinja macros as values (actually you can also have jinja macros as keys etc).
How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?
The JINJA templates are only evaluated when tasks are executed. So you are fine here.
Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?
Very good pattern. That's what they are for.