Search code examples
pythondataframesnowflake-cloud-data-platformsnowflake-connector

Using merge in python snowflake connector with pandas dataframe as a source


I'm retrieving data from an API and converting the data into a pandas dataframe. I'm using python-snowflake connector to send this data into my snowflake schema as a table.

I want to use merge instead of sending the duplicate data into my snowflake table.

Sample data I'm retrieving from API:

|------------|-------------|------------|
|  log_in_ID |   user_id   |   date     |
|------------|-------------|------------|
|    1       |     21      | 02/21/2021 |
|    2       |     22      | 02/24/2021 |
|    3       |     23      | 02/27/2021 |
|    4       |     21      | 02/29/2021 |
|------------|-------------|------------|

The log_in_ID is unique

Here is my code:

import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine(URL(
                 account='my_snowflake_account',
                 user='user',
                 password='password',
                 database='my_database'
                 schema='my_schema',
                 warehouse='warehouse',
                 role='ADMIN'))

pandas_df = 'Some code to get API data and convert into pandas dataframe'

def send_to_snowflake(pandas_df, target_table):
    connect = engine.connect()
    data.tosql(target_table, con=engine, index=False, if_exists='append')
    connection.close()
    engine.dispose()

if __name__ == "__main__":
   send_to_snowflake(pandas_df, target_table)

How can I use merge statement with log_in_id as unique key?

How can I use a pandas dataframe inside the merge query in snowflake-python?

merge into target_table using {pandas_dataframe}
      on target_table.log_in_id = {pandas_dataframe}.log_in_id
      when matched then
           update set target_table.user_id = {pandas_dataframe}.user_id and
                  set target_table.date = {pandas_dataframe}.date

Solution

  • If structure of your API is similar to below format: [(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]

    This code will works to merge the API data into snowflake target table without loading source data into tables:

    import requests
    import json
    import snowflake.connector
    import pandas as pd
    from sqlalchemy import create_engine
    from snowflake.sqlalchemy import URL
    
    
    def sample_func():
        engine = create_engine(URL(
                         account='xxx',
                         user='xxx',
                         password='xxx',
                         database='xxx',
                         schema='PUBLIC',
                         warehouse='COMPUTE_WH',
                         role='xxx',
        ))
    
        connection = engine.connect()
        pandas_df = 'select * from A'
    
        try:
            cursor_return = connection.execute(pandas_df)
            cursor_result = cursor_return.fetchall()
            api_data = str(cursor_result)[1:-1]
            print(api_data)
            merge_temp = """
            merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
          on target_table.log_in_id = src.COLUMN1
          when matched then
               update set target_table.log_in_id = src.COLUMN1,
                          target_table.user_id = src.COLUMN2,
                          target_table.test_data = src.COLUMN3
          when not matched then
          insert
          (log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
            """.format(str(api_data))
            print(merge_temp)
            c_return = connection.execute(merge_temp)
            c_result = c_return.fetchall()
            print(c_result)
    
            print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
        finally:
            connection.close()
            engine.dispose()
    
    sample_func()
    

    But I would recommend to load your API data into temp table and use merge statement on temp table, this approach will more faster then loading it from dataframe or csv files.