my Apache airflow version is 2.5.0.
I want to access and use the values of config json set on triggering the DAG. I have tried the following given solutions to the issue but None of them worked.
The config is ritten in the configuration json section :
{"conf1": "test"}
I want to acces the value of conf1 in my Python VirtualEnv Operator.
config JSON: {"conf1": "test"}
using **context
context['dag_run'].conf['conf1']
error: this gave the error that there was no "dag_run" in context
using Jinja Templete: did not work. dag_run not defined
dag_config = {{ dag_run }}
dag_config = {{ dag_run.conf['conf1'] }}
dag_config = "{{ dag_run }}"
DAG params: Could create default params BUT could not access them in the python operator
params={"conf1": ""}
Installing jinja2 to get Template: Error. "dag_run" not defined.
tm = Template("Hello {{ dag_run }}")
msg = tm.render(dag_run=dag_run)
tm = Template("Hello {{ dag_run.conf['conf1'] }}")
msg = tm.render(dag_run=dag_run)
Expectation:
To get the value 'test' in the functions under Python VirtualEnv Operator in a varaible that can be used.
You can use an upstream task as a helper to feed the context variable into the PythonVirtualEnvOperator:
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id='test_venv',
start_date=pendulum.datetime(2023, 1, 1),
schedule=None,
catchup=False,
params={
"param1": "default_value",
},
render_template_as_native_obj=True # To prevent all params to be converted to strings
):
@task(
templates_dict={"my_config" : "{{ params.param1 }}"}
)
def get_param(**kwargs):
return kwargs["templates_dict"]["my_config"]
@task.virtualenv(
task_id="virtualenv_python",
requirements=["numpy"],
)
def python_virtual_env_operator_task(my_config):
print(my_config)
python_virtual_env_operator_task(get_param())
You will also want to double check that core.dag_run_conf_overrides_params
is set to True.