Search code examples
postgresqldataframepsycopg2psycopg3

How to upload pandas data frames fast with psycopg3?


I am trying to build a function which loads large chunks of a data frame into a PostgreSQL table. The chunking etc is not part of this question so I didn't included it in the minimal example. This question focuses only on the function copy_chunk or more precisely on how to use cur.copy. Inspired by this answer and the psycopg documentation I tried my luck with this function:

import pandas as pd
import psycopg
from io import StringIO

def copy_chunk(
    conn,  # noqa: ANN001
    df_chunk: pd.DataFrame,
    table_name: str,
) -> None:
    """Upload a single chunk to the database using the COPY command."""
    with conn.cursor() as cur:
        # Create a buffer
        buffer = StringIO()
        df_chunk.to_csv(buffer, index=False, header=False)
        buffer.seek(0)

        # Load data into the table using copy method
        with cur.copy(f'COPY "{table_name}" FROM STDIN WITH (FORMAT CSV)') as copy:
            copy.write(buffer)
    conn.commit()

# Example usage
conn_string = "postgresql://username:password@hostname:port/dbname"
df_chunk = pd.DataFrame({
    'col1': [1, 2, 3],
    'col2': ['A', 'B', 'C']
})

# Establish connection
with psycopg.connect(conn_string) as conn:
    copy_chunk(conn, df_chunk, 'your_table_name')

My current problem is that it doesn't throws any exception but also not filling up my table. Can you people spot my error?


Solution

  • The below worked for me. Note I used the psycopg sql module to dynamically create the SQL. This is safer in the long run:

    import pandas as pd
    import psycopg
    from psycopg import sql
    from io import StringIO
    
    def copy_chunk(
        conn,  # noqa: ANN001
        df_chunk: pd.DataFrame,
        table_name: str,
    ) -> None:
        """Upload a single chunk to the database using the COPY command."""
        with conn.cursor() as cur:
            # Create a buffer
            buffer = StringIO()
            df_chunk.to_csv(buffer, index=False, header=False)
            buffer.seek(0)
            
            copy_sql = sql.SQL('COPY {} FROM STDIN WITH (FORMAT CSV)').format(sql.Identifier(table_name))
            # Load data into the table using copy method
            with buffer as f:
                with cur.copy(copy_sql) as copy:
                     while data := f.read(10):
                        copy.write(data)
    
        conn.commit()
    
    # Example usage
    conn_string = "postgresql://postgres:@localhost:5432/test"
    df_chunk = pd.DataFrame({
        'col1': [1, 2, 3],
        'col2': ['A', 'B', 'C']
    })
    
    # Establish connection
    with psycopg.connect(conn_string) as conn:
        copy_chunk(conn, df_chunk, 'pandas_test')
    

    With the result being:

    select * from pandas_test ;
     col1 | col2 
    ------+------
        1 | A
        2 | B
        3 | C
    (3 rows)
    

    Without the with buffer as f: I got this error:

    object of type '_io.StringIO' has no len()

    Using with buffer as f: iterates over the 'file' and sends the data to the COPY query.