Search code examples
python-3.xpostgresqlplpgsqlpsycopg2

How to run PostgreSQL query which starts with "DO $$ " in Python


I have PostgreSQL query which starts with DO:

do
$$
DECLARE
    temprow record;
BEGIN
    for temprow in
        select *
        from generate_series(1, 100)
        where generate_series % 2 = 0
    loop
        with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
        insert
        into tmp_table(input_value, value_100)
        select cte_input.val as input_value, cte_input.val::float / 100 as value_100
        from cte_input;
        commit;
    end loop;
END
$$ LANGUAGE plpgsql;

How I can run this query with Python and psycopg2? Is it right way to use temporary function if I need to run this query with some dynamic changes few times?

UPD

Thank you @erwin-brandstetter for information about COMMIT. I deleted COMMIT from query block and add it in Python code: ps_cursor.execute('COMMIT').

I write code in this way:


    import concurrent.futures
    import psycopg2 as pg
    from psycopg2 import pool
    
    features = [(1, name_of_feature_1), ...] # list of features
    
    list_query = []
    
    for feature in features:
        feature_id = feature[0]
        name_feature = feature[1]
        query = f"""--Feature:{feature_id}
        create or replace procedure pg_temp.proc_feature_{feature_id}_values()
        language plpgsql
        as
        
    $$
        DECLARE
            temprow record;
        BEGIN
            for temprow in
                select *
                from tmp_maternal_sample
                where maternal_sample = 1000
                loop
                     insert
                     into tmp_feature_values(feature_id, 
                                             feature_values_array,
                                             maternal_sample)
                     select feature_id,
                           array_agg(t_rank.{name_feature}) f_values,
                           temprow.maternal_sample
                     from t_rank
                             ....
                             ....
    
                end loop;
    end
    $$;
    call pg_temp.proc_feature_{feature_id}_values();
    """

        list_query.append(query)
    
    def load_query(query):
        ps_connection = threaded_postgreSQL_pool.getconn()
        if (ps_connection):
            print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
            ps_cursor = ps_connection.cursor()
            ps_cursor.execute(query)
            ps_cursor.execute('COMMIT')
            ps_cursor.close()
            result = f'Query {query[:15]} finished'
            print(result)
            return result
    
    try:
        threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
        if (threaded_postgreSQL_pool):
            print("Connection pool created successfully using ThreadedConnectionPool")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
            future_to_sql = {executor.submit(load_query, query): query for query in list_query}
            for future in concurrent.futures.as_completed(future_to_sql):
                sql = future_to_sql[future]
                try:
                    data = future.result()
                    
                except Exception as exc:
                    print('%s generated an exception: %s' % (sql[:15], exc))
                else:
                    print('%s page is %s bytes' % (sql[:15], data))
    
    except (Exception, pg.DatabaseError) as error:
        print("Error while connecting to PostgreSQL", error)
    
    finally:
        if threaded_postgreSQL_pool:
            threaded_postgreSQL_pool.closeall
        print('Threaded PG connection pool is closed')


Solution

  • It's safe to assume Postgres 11 or later, because:

    Your DO statement is convoluted without obvious reason. Simpler:

    DO
    LANGUAGE plpgsql
    $do$
    DECLARE
       i int;
    BEGIN
       FOR i IN
          SELECT generate_series(2, 100, 2)
       LOOP
          INSERT INTO tmp_table(input_value, value_100)
          VALUES (i, i::float / 100);
          -- COMMIT;  -- ?
       END LOOP;
    END
    $do$;
    

    Which boils down to just this - even including the creation of that temp table:

    CREATE TEMP TABLE tmp_table AS
    SELECT g AS input_value, g::float / 100 AS value_100
    FROM   generate_series(2, 100, 2) g;
    

    db<>fiddle here

    Some setups (like dbfiddle.uk) still don't allow transaction handling with COMMIT. Not sure you even need that?

    Either way, just execute the raw SQL.