I am trying to build a function which loads large chunks of a data frame into a PostgreSQL table. The chunking etc is not part of this question so I didn't included it in the minimal example. This question focuses only on the function copy_chunk
or more precisely on how to use cur.copy
. Inspired by this answer and the psycopg documentation I tried my luck with this function:
import pandas as pd
import psycopg
from io import StringIO
def copy_chunk(
conn, # noqa: ANN001
df_chunk: pd.DataFrame,
table_name: str,
) -> None:
"""Upload a single chunk to the database using the COPY command."""
with conn.cursor() as cur:
# Create a buffer
buffer = StringIO()
df_chunk.to_csv(buffer, index=False, header=False)
buffer.seek(0)
# Load data into the table using copy method
with cur.copy(f'COPY "{table_name}" FROM STDIN WITH (FORMAT CSV)') as copy:
copy.write(buffer)
conn.commit()
# Example usage
conn_string = "postgresql://username:password@hostname:port/dbname"
df_chunk = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['A', 'B', 'C']
})
# Establish connection
with psycopg.connect(conn_string) as conn:
copy_chunk(conn, df_chunk, 'your_table_name')
My current problem is that it doesn't throws any exception but also not filling up my table. Can you people spot my error?
The below worked for me. Note I used the psycopg
sql
module to dynamically create the SQL. This is safer in the long run:
import pandas as pd
import psycopg
from psycopg import sql
from io import StringIO
def copy_chunk(
conn, # noqa: ANN001
df_chunk: pd.DataFrame,
table_name: str,
) -> None:
"""Upload a single chunk to the database using the COPY command."""
with conn.cursor() as cur:
# Create a buffer
buffer = StringIO()
df_chunk.to_csv(buffer, index=False, header=False)
buffer.seek(0)
copy_sql = sql.SQL('COPY {} FROM STDIN WITH (FORMAT CSV)').format(sql.Identifier(table_name))
# Load data into the table using copy method
with buffer as f:
with cur.copy(copy_sql) as copy:
while data := f.read(10):
copy.write(data)
conn.commit()
# Example usage
conn_string = "postgresql://postgres:@localhost:5432/test"
df_chunk = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['A', 'B', 'C']
})
# Establish connection
with psycopg.connect(conn_string) as conn:
copy_chunk(conn, df_chunk, 'pandas_test')
With the result being:
select * from pandas_test ;
col1 | col2
------+------
1 | A
2 | B
3 | C
(3 rows)
Without the with buffer as f:
I got this error:
object of type '_io.StringIO' has no len()
Using with buffer as f:
iterates over the 'file' and sends the data to the COPY
query.