I am trying to load 2800 CSVs into RDS Postgres using the COPY command. My program below is loosely based on this and what it's doing is (1) listing all S3 objects (2) creating a table in Postgres (3) attempting to COPY one single file into the table I've created as a POC.
import boto3
import psycopg2
S3_BUCKET = "arapbi"
S3_FOLDER = "polygon/tickers/"
s3 = boto3.resource("s3")
my_bucket = s3.Bucket(S3_BUCKET)
object_list = []
for obj in my_bucket.objects.filter(Prefix=S3_FOLDER):
object_list.append(obj)
conn_string = "postgresql://user:pass@db.address.us-west-2.rds.amazonaws.com:5432/arapbi"
def write_sql(file):
sql = f"""
COPY tickers
FROM '{file}'
DELIMITER ',' CSV;
"""
return sql
table_create_sql = """
CREATE TABLE IF NOT EXISTS public.tickers ( ticker varchar(20),
timestamp timestamp,
open double precision,
close double precision,
volume_weighted_average_price double precision,
volume double precision,
transactions double precision,
date date
)"""
# Create the table
pg_conn = psycopg2.connect(conn_string, database="arapbi")
cur = pg_conn.cursor()
cur.execute(table_create_sql)
pg_conn.commit()
cur.close()
pg_conn.close()
# attempt to upload one file to the table
sql_copy = write_sql(object_list[-1].key)
pg_conn = psycopg2.connect(conn_string, database="arapbi")
cur = pg_conn.cursor()
cur.execute()
pg_conn.commit()
cur.close()
pg_conn.close()
sql_copy
, in this case, is
COPY tickers
FROM 'polygon/tickers/dt=2023-04-24/2023-04-24.csv'
DELIMITER ',' CSV;
When I run the part that's supposed to COPY a file into Postgres, I get the following error:
FeatureNotSupported: COPY from a file is not supported
HINT: Anyone can COPY to stdout or from stdin. psql's \copy command also works for anyone.
The only other example of this I could find online was FeatureNotSupported: COPY from a file is not supported, which didn't get resolved.
I'm still working on this and will update with an answer if I get there first. I'm curious if anyone else has a workload like mine (the need to copy csvs from S3 into RDS Postgres) and how they resolved it.
Huge thanks to both Adrian Klaver and Thorn for pointing me in the right direction. Just for the sake of completeness, I've added my new updated solution below:
conn_string = f"postgresql://{user}:{password}@arapbi20240406153310133100000001.c368i8aq0xtu.us-west-2.rds.amazonaws.com:5432/arapbi"
pg_conn = psycopg2.connect(conn_string, database="arapbi")
for i, file in enumerate(object_list[]):
cur = pg_conn.cursor()
output = StringIO()
obj = object_list[i].key
bucket_name = object_list[i].bucket_name
df = pd.read_csv(f"s3a://{bucket_name}/{obj}").drop("Unnamed: 0", axis=1)
output.write(df.to_csv(index=False, header=False, na_rep='NaN'))
output.seek(0)
cur.copy_expert(f"COPY tickers FROM STDIN WITH CSV HEADER", output)
pg_conn.commit()
cur.close()
n_records = str(df.count())
print(f"loaded {n_records} records from s3a://{bucket_name}/{obj}")
pg_conn.close()
The postgres COPY
command relies on the files being accessible by the server. Usually this means that the files are on the server itself, so using COPY remotely with a file doesn't work. The RDS can't access the S3 file path, so the copy command does not work (in fact I suspect it is disabled on RDS). Copying from the STDIN is still supported, as is indicated from the error message:
HINT: Anyone can COPY to stdout or from stdin.
https://www.postgresql.org/docs/current/sql-copy.html
As mentioned in @adrian klaver's comment, psycopg has some copy helper functions for copying - copy_from
and copy_expert
both take a file-like object (such as a CSV) as an argument.