Search code examples
pythonairflowpostgres-operator

How to pass paremeter in my .sql file in PostGresOperator airflow?


I have sql file and I want to pass parameters to that sql file using PostGresOperator.

"""select * from table_{} where id > ID """.format(mytable,myID)

My postGresOperator

mport_redshift_table = PostgresOperator(
            task_id='copy_data_from_redshift_{}'.format(country),
            postgres_conn_id='postgres_default',
            sql="""
                   select * from table_{} where id > {}
                """.format(mytable,myID)

How can I do the same and pass my parameters in my .sql file and still use .format(mytable,myID) ?

so that I can pass them into my referenced .sql file.


Solution

  • As explained in the How-to Guide for PostgresOperator, you can place your SQL in a file within a subfolder in the dag directory:

    -- dags/sql/birth_date.sql
    SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
    

    Use params to pass in the key/value pairs that would be rendered within the SQL in your file:

    get_birth_date = PostgresOperator(
        task_id="get_birth_date",
        postgres_conn_id="postgres_default",
        sql="sql/birth_date.sql",
        params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
    )
    

    Edit

    If you want to do it inline, without using the file, just use any string interpolation mechanism:

    sql="""select * from table_{} where id > {} """.format(mytable,myID)
    

    or

    sql=f"""select * from table_{table_name} where id > {myID} """
    

    or if you want to use jinja, taking advantage of any of the default vairables, such as a param provided when the DAG was triggered you could do it like this:

    sql=f"""select * from table_{{ params.param1 }} where id > {myID} """