Search code examples
postgresqlpython-polars

Storing in PostgreSQL using Python Polars


I want to store a datframe from a parquet file into a PostgreSQL using Polars using this code:

def store_in_postgresql(df):
    password = 'anon'
    username = 'postgres'
    database = 'nyc_taxis'
    uri = f'postgresql://{username}:{password}@localhost:5432/{database}'
    engine = create_engine(uri)
    common_sql_state = "SQLSTATE: 42P07"
    
    try:
        df.write_database(table_name, connection=uri,
                          engine='adbc', if_exists='replace')
        print('loading has been completed!')
    except Exception as e:
        if(common_sql_state in str(e)):
            df.write_database(table_name, connection=uri,
                          engine='adbc', if_exists='append')
            print('loading has been completed!')
        else:
            print(e)

but I'm getting this error:

INVALID_ARGUMENT: [libpq] Failed to execute COPY statement: PGRES_FATAL_ERROR ERROR:  COPY file signature not recognized
. SQLSTATE: 22P04

the code store dataframes with small sizes like 4 million rows (200 mb) but when I want to store a big dataframe with 18 million rows (500 mb) I get the error above, is there a way to fix the code or perhaps slice the dataframe to have it stored in the database? thank you in advance.


Solution

  • I think I've figured it out, it was a memory problem. since the function works for smaller dataframes and not big ones I made a new function to split the big dataframe into smaller chunks and then they are processed by the function above:

    def store_by_chunking(df, num_of_chunks):
    try:
        length = len(df) // num_of_chunks
        j = 0
        list_of_chunks = []
        
        for i in range(num_of_chunks):
            list_of_chunks.append(j)
            j += length
        list_of_chunks.append(len(df))
        
        print(list_of_chunks)
        
        for i in range(num_of_chunks):
            sliced_df = df.slice(list_of_chunks[i], list_of_chunks[i+1])
            store_in_postgresql(sliced_df)
    except Exception as e:
        print(e)
    

    this code has not given me an error so far, hope it helps people with the same problem.