Search code examples
pysparksnowflake-cloud-data-platform

Refactoring PySpark to Snowflake Snowpark code


Most of code is written in PySpark to be executed on Databricks.

I am evaluating SnowFlake with it's ability to execute Python with Snowpark.

Can someone let me know how I might go about refactoring the following PySpark function to Snowpark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def checkHiveTableExists(tableName,stageName="base"): 
  
  try:
    spark.sql(f"describe {stageName}{tableName}")
    return True
  except Exception as e:
    print(f"An error was thrown, <<{e}>>")
    return False

My attempt is a follows:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.snowpark import session


def checkHiveTableExists(tableName,stageName="base"): 
 

  try:
    spark.sql(f"describe {stageName}{tableName}")
    return True
  except Exception as e:
    print(f"An error was thrown, <<{e}>>")
    return False
  
def main(session: snowpark.Session):
    return checkHiveTableExists(False)

But it failed.

Any thoughts?


Solution

  • Original code:

    def checkHiveTableExists(tableName,stageName="base"): 
      
      try:
        spark.sql(f"describe {stageName}{tableName}")
        return True
      except Exception as e:
        print(f"An error was thrown, <<{e}>>")
        return False
    

    Copy-paste approach into Snowflake will not work.

    • There is no spark module
    • the direct SQL is also different DESC ... => DESC TABLE ...

    The original code itself is somehow tricky, because for object existence is uses query to describe the object

    Second: describe {stageName}{tableName} string interpolation makes it prone to SQL Injection - somebody could try to call it as checkHivetableExists("someName'; DROP TABLE ...")


    When translating code the focus should be on behavior.

    Personally I would perform a metadata check INFORMATION_SCHEMA.TABLES and see if row exists - user calling this code needs to have permission to access this table.

    Anyway, trying to be as as close as possible to original code:

    import snowflake.snowpark as snowpark
    
    def checkTableExists(session: snowpark.Session, tableName, schemaName="BASE"): 
      try:
        session.sql(f"DESC TABLE IDENTIFIER('{schemaName}.{tableName}')").collect()
        return True
      except Exception as e:
        #print(f"An error was thrown, <<{e}>>")
        return False
      
    def main(session: snowpark.Session):
        return checkTableExists(session, 'TEST')