hi i have a python code where i am reading some data from PostgreSQL table(schema of the tables in a db) and doing some transformations and then writing some data back to PostgreSQL table but in another schema so same database but under different schema. but my code is writing the data or creating table and writing data to same schema.
below is my code. in the below code i am giving the schema name and table name in the variable 'table_name' but its not helping me.
import pandas as pd
import psycopg2
from sqlalchemy import create_engine, JSON
import json
# Function to convert PostgreSQL data types to Snowflake data types
def convert_data_type(pg_data_type):
conversion_dict = {
'integer': 'NUMBER',
'bigint': 'NUMBER',
'smallint': 'NUMBER',
'serial': 'NUMBER',
'bigserial': 'NUMBER',
'decimal': 'NUMBER',
'numeric': 'NUMBER',
'real': 'FLOAT',
'double precision': 'FLOAT',
'money': 'FLOAT',
'character varying': 'VARCHAR',
'varchar': 'VARCHAR',
'character': 'CHAR',
'char': 'CHAR',
'text': 'STRING',
'bytea': 'BINARY',
'timestamp without time zone': 'TIMESTAMP_NTZ',
'timestamp with time zone': 'TIMESTAMP_TZ',
'date': 'DATE',
'time without time zone': 'TIME_NTZ',
'time with time zone': 'TIME_TZ',
'boolean': 'BOOLEAN'
}
return conversion_dict.get(pg_data_type, pg_data_type)
pg_dbname = "my_db_name"
pg_user = "my_user"
pg_password = "My_pw"
pg_host = "My_host"
pg_port = "5432"
# Connect to PostgreSQL database
conn = psycopg2.connect(
dbname=pg_dbname,
user=pg_user,
password=pg_password,
host=pg_host,
port=pg_port
)
# Create a cursor object
cur = conn.cursor()
# Query to get table schemas
cur.execute("""
SELECT table_schema, table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name
""")
# Fetch all results
results = cur.fetchall()
# Close the cursor and connection
cur.close()
conn.close()
# Process the results and create a DataFrame
data = []
for row in results:
table_schema, table_name, column_name, data_type = row
converted_data_type = convert_data_type(data_type)
data.append([table_schema, table_name, column_name, data_type, converted_data_type])
df = pd.DataFrame(data, columns=['table_schema', 'table_name', 'column_name', 'original_data_type', 'converted_data_type'])
# Grouping data and making dictionary
result = (
df.groupby(['table_schema', 'table_name'])
.apply(lambda x: pd.Series({
'columns': dict(zip(x['column_name'], x['converted_data_type'])),
'original_columns': dict(zip(x['column_name'], x['original_data_type']))
}))
.reset_index()
)
# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_dbname}')
# Define the table name in the new schema
table_name = 'project_demo.mapping_table'
# Insert the DataFrame into the PostgreSQL table in the new schema
result.to_sql(table_name, engine, if_exists='replace', index=False, dtype={'columns': JSON, 'original_columns': JSON})
print("Data inserted successfully!")
i tried to change the schema name in the variable
I googled "pandas to_sql", which pointed me here: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html. There is a schema
keyword argument listed. So, just do result.to_sql('mapping_table', engine, schema='project_demo', ...)
. By passing a dotted schema.table as the first arg, your current code will instead try to write to a table called "project_demo.mapping_table"
in the public
schema.