Search code examples
pythonairflowjinja2directed-acyclic-graphs

Apache-airflow 2.5.0: How to utilize config JSON in Python VirtualEnv Operator


my Apache airflow version is 2.5.0.

I want to access and use the values of config json set on triggering the DAG. I have tried the following given solutions to the issue but None of them worked.

The config is ritten in the configuration json section :
{"conf1": "test"}

I want to acces the value of conf1 in my Python VirtualEnv Operator.

config JSON: {"conf1": "test"}

  • using **context
    context['dag_run'].conf['conf1']
    error: this gave the error that there was no "dag_run" in context

  • using Jinja Templete: did not work. dag_run not defined
    dag_config = {{ dag_run }}
    dag_config = {{ dag_run.conf['conf1'] }}
    dag_config = "{{ dag_run }}"

  • DAG params: Could create default params BUT could not access them in the python operator
    params={"conf1": ""}

  • Installing jinja2 to get Template: Error. "dag_run" not defined.
    tm = Template("Hello {{ dag_run }}")
    msg = tm.render(dag_run=dag_run)
    tm = Template("Hello {{ dag_run.conf['conf1'] }}")
    msg = tm.render(dag_run=dag_run)

Expectation:

To get the value 'test' in the functions under Python VirtualEnv Operator in a varaible that can be used.


Solution

  • You can use an upstream task as a helper to feed the context variable into the PythonVirtualEnvOperator:

    from airflow import DAG
    from airflow.decorators import task
    import pendulum
    
    with DAG(
        dag_id='test_venv',
        start_date=pendulum.datetime(2023, 1, 1),
        schedule=None,
        catchup=False,
        params={
            "param1": "default_value",
        },
        render_template_as_native_obj=True # To prevent all params to be converted to strings
    ):
    
        @task(
            templates_dict={"my_config" : "{{ params.param1 }}"}
        )
        def get_param(**kwargs):
            return kwargs["templates_dict"]["my_config"]
    
        @task.virtualenv(
            task_id="virtualenv_python",
            requirements=["numpy"],
        )
        def python_virtual_env_operator_task(my_config):
            print(my_config)
    
        python_virtual_env_operator_task(get_param())
    

    You will also want to double check that core.dag_run_conf_overrides_params is set to True.