I'm retrieving data from an API and converting the data into a pandas dataframe. I'm using python-snowflake connector to send this data into my snowflake schema as a table.
I want to use merge instead of sending the duplicate data into my snowflake table.
Sample data I'm retrieving from API:
|------------|-------------|------------|
| log_in_ID | user_id | date |
|------------|-------------|------------|
| 1 | 21 | 02/21/2021 |
| 2 | 22 | 02/24/2021 |
| 3 | 23 | 02/27/2021 |
| 4 | 21 | 02/29/2021 |
|------------|-------------|------------|
The log_in_ID is unique
Here is my code:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(URL(
account='my_snowflake_account',
user='user',
password='password',
database='my_database'
schema='my_schema',
warehouse='warehouse',
role='ADMIN'))
pandas_df = 'Some code to get API data and convert into pandas dataframe'
def send_to_snowflake(pandas_df, target_table):
connect = engine.connect()
data.tosql(target_table, con=engine, index=False, if_exists='append')
connection.close()
engine.dispose()
if __name__ == "__main__":
send_to_snowflake(pandas_df, target_table)
How can I use merge statement with log_in_id as unique key?
How can I use a pandas dataframe inside the merge query in snowflake-python?
merge into target_table using {pandas_dataframe}
on target_table.log_in_id = {pandas_dataframe}.log_in_id
when matched then
update set target_table.user_id = {pandas_dataframe}.user_id and
set target_table.date = {pandas_dataframe}.date
If structure of your API is similar to below format: [(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]
This code will works to merge the API data into snowflake target table without loading source data into tables:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
def sample_func():
engine = create_engine(URL(
account='xxx',
user='xxx',
password='xxx',
database='xxx',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='xxx',
))
connection = engine.connect()
pandas_df = 'select * from A'
try:
cursor_return = connection.execute(pandas_df)
cursor_result = cursor_return.fetchall()
api_data = str(cursor_result)[1:-1]
print(api_data)
merge_temp = """
merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
on target_table.log_in_id = src.COLUMN1
when matched then
update set target_table.log_in_id = src.COLUMN1,
target_table.user_id = src.COLUMN2,
target_table.test_data = src.COLUMN3
when not matched then
insert
(log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
""".format(str(api_data))
print(merge_temp)
c_return = connection.execute(merge_temp)
c_result = c_return.fetchall()
print(c_result)
print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
finally:
connection.close()
engine.dispose()
sample_func()
But I would recommend to load your API data into temp table and use merge statement on temp table, this approach will more faster then loading it from dataframe or csv files.