Search code examples
pythonpostgresql

How to template big postgresql query with a temporary table in python?


Let's say I have an extremely complicated and long postgres file (daily_pipeline.sql) that performs many inserts, updates, statistics calculations, etc...

The input to this file would be a table of new data that is received every day via a data dump.

The strategy I see as working the best would be to have a templated SQL file like so:

create temp table todays_data as (
  %s
);

-- do all my inserts

-- do all my updates

-- do all my calculations

Then in python I'd read in the new dataset with something like pandas, and format my new dataset into this templated file into the todays_data temporary table, write the now-formatted file to a new file daily_pipeline_formatted.sql, and execute that file from the command line using psql -f daily_pipeline_formatted.sql.

I've looked into templating using something like jinja but I couldn't find much about templating an entire temporary table.

Does anybody have any direction on how to do this/what resources I can use?


Solution

  • A rough draft:

    import psycopg2                                                                                                                                                            
    from psycopg2 import sql
    from datetime import date    
    
    cat csv_test.csv 
    1, 1
    3, 4
    
    con = psycopg2.connect(dbname="test", host='localhost', user='postgres', port=5432)
                                                                              
    cur = con.cursor()                                                                       
    
    tbl_name = date.today().isoformat() + "-" + "tmp_table"
    
    tbl_sql = sql.SQL("CREATE TEMP TABLE {} (id integer, fld_1 integer)").format(sql.Identifier(tbl_name)) 
    
    cur.execute(tbl_sql)
    
    cp_sql = sql.SQL("COPY {} FROM STDIN WITH CSV").format(sql.Identifier(tbl_name))
    
    with open("/home/aklaver/csv_test.csv") as csv_file:
        cur.copy_expert(cp_sql, csv_file)
    
    cur.execute(sql.SQL("select * from {}").format(sql.Identifier(tbl_name)))
    
    cur.fetchall()
    [(1, 1), (3, 4)]