Search code examples
apache-sparkpysparkaws-gluedelta-lake

Converting PySpark dataframe to a Delta Table


I am working in AWS Glue environment. I read the data from Glue catalog as a Dynamic dataframe and convert it to Pyspark dataframe for my custom transformations. To do an upsert of the new/updated data, I am intending to use delta tables.

But I'm only finding options to read data as a delta table from a path. I need to convert my Pyspark dataframe to a Delta table for doing merge operations. Is there any way to do this?


Solution

  • You need to have only a destination table as Delta table. The data that you're planning to merge into is not required to be a Delta table. It's really depends on what API you're using:

    • If you're using Python API, then you can just use dataframe as is (example is based on docs):
    from delta.tables import *
    
    deltaTable = DeltaTable.forPath(spark, "/data/events/")
    updatesDF = .... # your transformed dataframe
    
    deltaTable.alias("target").merge(
        updatesDF.alias("updates"),
        "target.col1 = updates.col1") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
    
    • If you're using SQL MERGE command - you can just register the temp view for your dataframe, and use it as input into the MERGE SQL command:
    updates_df.createOrReplaceTempView(updates)
    merge_sql = f"""
          merge into target
          using updates
            
          ON source.col1 == target.col1
          WHEN MATCHED THEN UPDATE SET *
          WHEN NOT MATCHED THEN INSERT *
        """
    updates_df._jdf.sparkSession().sql(merge_sql)
    

    The only catch here is that you need to use df._jdf.sparkSession().sql to execute the SQL command in the same context where you have registered the temp view.