so i've been trying to run 2 postgres operator on my DAG and looks like this:
default_args = {
'owner': 'local',
}
log = logging.getLogger(_name_)
TEMP_SETTLEMENT ="""
set statement_timeout to 0;
select function_a();
"""
VACUM_SETTLEMENT="""
vacuum (verbose, analyze) test_table;
"""
try:
with DAG(
dag_id='temp-test',
default_args=default_args,
schedule_interval=none,
start_date=datetime(2021, 10, 1),
max_active_runs=1,
catchup=False
) as dag:
pg = PostgresOperator(
task_id="data",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=TEMP_SETTLEMENT,
)
vacum = PostgresOperator(
task_id="vacum",
postgres_conn_id="connection_1",
database="DB_test",
autocommit=True,
sql=VACUM_SETTLEMENT, )
pg >> vacum
except ImportError as e:
log.warning("Could not import DAGs: %s", str(e))
i keep getting the statement timeout when i try to run the temp_settlement, is there any way to keep the statement_timeout=0?
Thanks
Update:
Note that PostgresOperator
is deprecated. It is best to migrate to apache-airflow-providers-common-sql
Then use:
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQLExecuteQueryOperator(
...,
conn_id='postgres_default', # specify the Postgres conn_id
hook_params={'options': '-c statement_timeout=3000ms'}
)
If for some reason you are not able to use SQLExecuteQueryOperator
then:
For apache-airflow-providers-postgres>=5.5.2
you can do:
PostgresOperator(
...,
hook_params={'options': '-c statement_timeout=3000ms'}
)
For apache-airflow-providers-postgres>=4.1.0
you can do:
PostgresOperator(
...,
runtime_parameters={'statement_timeout': '3000ms'},
)
This capability was added in PR that solved issue.
Original Answer:
You didn't mention it by from your description I assume that the timeout comes from Postgres and not from Airflow.
For the moment the PostgresOperator
does not allow to override the hook/connection settings.
To solve your issue you will need to edit connection_1
in the extra field as explained in the docs you will need to add statement_timeout
:
{'statement_timeout': '3600s'}
I opened https://github.com/apache/airflow/issues/21486 as a followup feature request to allow setting statement_timeout
directly from the operator.