Search code examples
pythonpostgresqlpsycopg2

Bulk update of rows in Postgres DB using psycopg2


We need to do bulk updates of many rows in our Postgres DB, and want to use the SQL syntax below. How do we do that using psycopg2?

UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

Attempt 1 - Passing values

We need to pass a nested iterable format to the psycopg2 query. For the update_payload, I've tried passing a list of lists, list of tuples, and tuples of tuples. It all fails with various errors.

Attempt 2 - Writing custom class with __conform__

I've tried to write a custom class that we can use for these operations, which would return

(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

I've coded up like this following instructions here, but it's clear that I'm doing something wrong. For instance, in this approach I'll have to handle quoting of all values inside the table, which would be cumbersome and prone to errors.

class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

EDIT: If doing a bulk update can be done in a faster/cleaner way using another syntax than the one in my original question, then I'm all ears!


Solution

  • Requirements:

    • Postgres table, consisting of the fields id and msg (and potentially other fields)
    • Python data containing new values for msg
    • Postgres table should be updated via psycopg2

    Example Table

    CREATE TABLE einstein(
       id CHAR(5) PRIMARY KEY,
       msg VARCHAR(1024) NOT NULL
    );
    

    Test data

    INSERT INTO einstein VALUES ('a', 'empty');
    INSERT INTO einstein VALUES ('b', 'empty');
    INSERT INTO einstein VALUES ('c', 'empty');
    

    Python Program

    Hypothetical, self-contained example program with quotations of a famous physicist.

    import sys
    import psycopg2
    from psycopg2.extras import execute_values
    
    
    def print_table(con):
        cur = con.cursor()
        cur.execute("SELECT * FROM einstein")
        rows = cur.fetchall()
        for row in rows:
            print(f"{row[0]} {row[1]}")
    
    
    def update(con, einstein_quotes):
        cur = con.cursor()
        execute_values(cur, """UPDATE einstein 
                               SET msg = update_payload.msg 
                               FROM (VALUES %s) AS update_payload (id, msg) 
                               WHERE einstein.id = update_payload.id""", einstein_quotes)
        con.commit()
    
    
    def main():
        con = None
        einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                           ("b", "I have no special talent. I am only passionately curious."),
                           ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]
    
        try:
            con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
            print_table(con)
            update(con, einstein_quotes)
            print("rows updated:")
            print_table(con)
    
        except psycopg2.DatabaseError as e:
    
            print(f'Error {e}')
            sys.exit(1)
    
        finally:
    
            if con:
                con.close()
    
    
    if __name__ == '__main__':
        main()
    

    Prepared Statements Alternative

    import sys
    import psycopg2
    from psycopg2.extras import execute_batch
    
    
    def print_table(con):
        cur = con.cursor()
        cur.execute("SELECT * FROM einstein")
        rows = cur.fetchall()
        for row in rows:
            print(f"{row[0]} {row[1]}")
    
    
    def update(con, einstein_quotes, page_size):
        cur = con.cursor()
        cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
        execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
        cur.execute("DEALLOCATE updateStmt")
        con.commit()
    
    
    def main():
        con = None
        einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                           {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                           {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})
    
        try:
            con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
            print_table(con)
            update(con, einstein_quotes, 100)  #choose some meaningful page_size here
            print("rows updated:")
            print_table(con)
    
        except psycopg2.DatabaseError as e:
    
            print(f'Error {e}')
            sys.exit(1)
    
        finally:
    
            if con:
                con.close()
    
    
    if __name__ == '__main__':
        main()
    

    Output

    The above program would output the following to the debug console:

    a     empty
    b     empty
    c     empty
    rows updated:
    a     Few are those who see with their own eyes and feel with their own hearts.
    b     I have no special talent. I am only passionately curious.
    c     Life is like riding a bicycle. To keep your balance you must keep moving.