Search code examples
postgresqlairflowdirected-acyclic-graphspostgres-operator

Run postgres operator with statement timeout airflow dag


so i've been trying to run 2 postgres operator on my DAG and looks like this:

default_args = {
    'owner': 'local',
}

log = logging.getLogger(_name_)

TEMP_SETTLEMENT ="""
set statement_timeout to 0;
select function_a();
"""

VACUM_SETTLEMENT="""
vacuum (verbose, analyze) test_table;
"""

try:
    with DAG(
        dag_id='temp-test',
        default_args=default_args,
        schedule_interval=none,
        start_date=datetime(2021, 10, 1),
        max_active_runs=1,
        catchup=False
    ) as dag:

        pg = PostgresOperator(
            task_id="data",
            postgres_conn_id="connection_1",
            database="DB_test",
            autocommit=True,
            sql=TEMP_SETTLEMENT,
        )

        vacum = PostgresOperator(
            task_id="vacum",
            postgres_conn_id="connection_1",
            database="DB_test",
            autocommit=True,
            sql=VACUM_SETTLEMENT,    )

        pg >> vacum 

except ImportError as e:
    log.warning("Could not import DAGs: %s", str(e))

i keep getting the statement timeout when i try to run the temp_settlement, is there any way to keep the statement_timeout=0?

Thanks


Solution

  • Update:

    Note that PostgresOperator is deprecated. It is best to migrate to apache-airflow-providers-common-sql Then use:

    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    
    SQLExecuteQueryOperator(
    ...,
    conn_id='postgres_default', # specify the Postgres conn_id 
    hook_params={'options': '-c statement_timeout=3000ms'}
    )
    

    If for some reason you are not able to use SQLExecuteQueryOperator then:

    For apache-airflow-providers-postgres>=5.5.2 you can do:

    PostgresOperator(
    ...,
    hook_params={'options': '-c statement_timeout=3000ms'}
    )
    

    For apache-airflow-providers-postgres>=4.1.0 you can do:

    PostgresOperator(
    ...,
    runtime_parameters={'statement_timeout': '3000ms'},
    )
    

    This capability was added in PR that solved issue.

    Original Answer:

    You didn't mention it by from your description I assume that the timeout comes from Postgres and not from Airflow.

    For the moment the PostgresOperator does not allow to override the hook/connection settings. To solve your issue you will need to edit connection_1 in the extra field as explained in the docs you will need to add statement_timeout:

    {'statement_timeout': '3600s'}
    

    I opened https://github.com/apache/airflow/issues/21486 as a followup feature request to allow setting statement_timeout directly from the operator.