I have an Airflow dag with a PostgresOperator to execute a SQL query. I want to switch to different database(using same connection) with config (run w/config). But it gives an error database "{{ dag_run.conf['database'] }}" does not exist
I try to create a custom postgresql operator with similar code here
I added below code, but it gives me same error (not rendering template)
from airflow.providers.postgres.operators.postgres import PostgresOperator as _PostgresOperator
class PostgresOperator(_PostgresOperator):
template_fields = [*_PostgresOperator.template_fields, "database"]
PostgresOperator
is deprecated. You should migrate to SQLExecuteQueryOperator
.
In older versions the equivalent of database
in PostgresOperator
is schema in SQLExecuteQueryOperator
as can be seen here. However in newer versions (since this PR) it's simply database
.
Since SQLExecuteQueryOperator
is generic operator it allows to pass the different hooks parameters with hook_params
. However, hook_params
is not a templated field thus you should subclass SQLExecuteQueryOperator
to add it as templated field.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
class CustomSQLExecuteQueryOperator(SQLExecuteQueryOperator):
template_fields: Sequence[str] = tuple({"hook_params"} | set(SQLExecuteQueryOperator.template_fields))
with DAG('sql_example', start_date=datetime(2023, 1, 1), schedule=None):
CustomSQLExecuteQueryOperator(task_id='postgres',
conn_id='postgres_default',
sql="SELECT 1",
hook_params={"database": "{{ ds }}"} # ds is not a valid schema just to show render works
)
Which will give:
Obviously 2024-04-25
is not a valid schema but this is just to show that rendering works.