Search code examples
pandaspsycopg2postgresql-9.5

Postgres 9.5 upsert command in pandas or psycopg2?


Most of the examples I see are people inserting a single row into a database with the ON CONFLICT DO UPDATE syntax.

Does anyone have any examples using SQLAlchemy or pandas.to_sql?

99% of my inserts are using psycopg2 COPY command (so I save a csv or stringio and then bulk insert), and the other 1% are pd.to_sql. All of my logic to check for new rows or dimensions is done in Python.

def find_new_rows(existing, current, id_col):
        current[id_col] = current[id_col].astype(int)
        x = existing[['datetime', id_col, 'key1']]
        y = current[['datetime', id_col, 'key2']]
        final = pd.merge(y, x, how='left', on=['datetime', id_col])
        final = final[~(final['key2'] == final['key1'])]
        final = final.drop(['key1'], axis=1)
        current = pd.merge(current, final, how='left', on=['datetime', id_col])
        current = current.loc[current['key2_y'] == 1]
        current.drop(['key2_x', 'key2_y'], axis=1, inplace=True)
        return current

Can someone show me an example of using the new PostgreSQL syntax for upsert with pyscopg2? A common use case is to check for dimension changes (between 50k - 100k rows daily which I compare to existing values) which is CONFLICT DO NOTHING to only add new rows.

Another use case is that I have fact data which changes over time. I only take the most recent value (I currently use a view to select distinct), but it would be better to UPSERT, if possible.


Solution

  • FYI, this is the solution I am using currently.

    It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.

    def create_update_query(table):
        """This function creates an upsert query which replaces existing data based on primary key conflicts"""
        columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
        constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
        placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
        updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
        query = f"""INSERT INTO {table} ({columns}) 
                    VALUES ({placeholder}) 
                    ON CONFLICT ({constraint}) 
                    DO UPDATE SET {updates};"""
        query.split()
        query = ' '.join(query.split())
        return query
    
    
    def load_updates(df, table, connection):
        conn = connection.get_conn()
        cursor = conn.cursor()
        df1 = df.where((pd.notnull(df)), None)
        insert_values = df1.to_dict(orient='records')
        for row in insert_values:
            cursor.execute(create_update_query(table=table), row)
            conn.commit()
        row_count = len(insert_values)
        logging.info(f'Inserted {row_count} rows.')
        cursor.close()
        del cursor
        conn.close()