Search code examples
pythonairflowjinja2

How can the default list of email addresses be rendered from a variable in Airflow?


I have a lot of Airflow DAGs and I'd like to automatically send email notifications to the same list of recipients (stored in an Airflow variable) on any task failure, so I'm using the following default operator configuration defined at the DAG level:

dag = DAG(
    ...
    default_args = {
        ...
        "email": "{{ ','.join( var.json.get("my_email_list", []) ) }}",
        "email_on_failure": True,
        ...
    },
    ...
)

Unfortunately, looks like the email argument doesn't support templating and it simply gets passed to the email back-end as-is without rendering, so my approach isn't working.

Could anybody suggest a decent workaround for my particular case, please? I don't really want to hard-code the list of email addresses in the source code, because storing them in an Airflow variable gives much more flexibility.


Solution

  • Here is my a bit hacky solution since neither email is templated in BaseOperator nor there is a way to tweak template_fields at the task level (by a task I mean a configured instance of an operator), and I don't really want to define dummy subclasses for each and every built-in operator just to add email to template_fields, e.g.:

    from airflow.operators.python import PythonOperator
    
    class MyPythonOperator(PythonOperator):
        template_fields = PythonOperator.template_fields + ("email",)  # this sucks!
    

    So I've decided to stick to the following monkey-patching-like approach to dynamically add custom template fields to all the operators visible/reachable in the current module's scope (the function has to be defined somewhere in a shared/common module and then imported and called on the module level of each DAG):

    from airflow.models import BaseOperator
    
    def extend_operator_template_fields_with(
        extra_template_fields,
        base_operator_class=BaseOperator,
    ) -> None:
        for operator_class in base_operator_class.__subclasses__():
            # Use a dict (w/o values) instead of a set to keep the original order of template fields for the operator class.
            template_fields_dict = dict.fromkeys(operator_class.template_fields)
            template_fields_dict.update(dict.fromkeys(extra_template_fields))
            operator_class.template_fields = tuple(template_fields_dict.keys())
    
            extend_operator_template_fields_with(extra_template_fields, base_operator_class=operator_class)
    

    P.S. I'm still open to a more elegant solution, just haven't been able to find a better one yet (I'm using Airflow 2.2.5).