I am working in AWS Glue environment. I read the data from Glue catalog as a Dynamic dataframe and convert it to Pyspark dataframe for my custom transformations. To do an upsert of the new/updated data, I am intending to use delta tables.
But I'm only finding options to read data as a delta table from a path. I need to convert my Pyspark dataframe to a Delta table for doing merge operations. Is there any way to do this?
You need to have only a destination table as Delta table. The data that you're planning to merge into is not required to be a Delta table. It's really depends on what API you're using:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
The only catch here is that you need to use df._jdf.sparkSession().sql
to execute the SQL command in the same context where you have registered the temp view.