I am trying to set up a demo Glue job that demonstrates upserts using data lake framework.
I have example full load data I have saved as delta table in S3 bucket defined as follows:
data = {'visitor': ['foo', 'bar', 'baz'],
'id': [1, 2, 3],
'B': [1, 0, 1],
'C': [1, 0, 0]}
and example incremental data with same setup but in different S3 prefix defined as follows:
data_updated = {'visitor': ['foo_updated'],
'id': [1],
'B': [1],
'C': [1]}
After executing the following statements:
delta_df = DeltaTable.forPath(spark, "s3://example_bucket/full_load")
cdc_df = spark.read.format("delta").load("s3://example_bucket/incremental_load/")
final_df = delta_df.alias("prev_df").merge( \
source = cdc_df.alias("append_df"), \
#matching on primarykey
condition = expr("prev_df.id = append_df.id"))\
.whenMatchedUpdate(set= {
"prev_df.B" : col("append_df.B"),
"prev_df.C" : col("append_df.C"),
"prev_df.visitor" : col("append_df.visitor")} )\
.whenNotMatchedInsert(values =
#inserting a new row to Delta table
{ "prev_df.B" : col("append_df.B"),
"prev_df.C" : col("append_df.C"),
"prev_df.visitor" : col("append_df.visitor"),
})\
.execute()
Tables in s3 were created as follows:
df = pd.DataFrame(data)
dataFrame = spark.createDataFrame(df)
dataFrame.write \
.format("delta") \
.mode("overwrite") \
.save("s3://example_bucket/full_load", overwrite=True)
df = pd.DataFrame(data)
dataFrame = spark.createDataFrame(df)
dataFrame.write \
.format("delta") \
.mode("overwrite") \
.save("s3://example_bucket/incremental_load", overwrite=True)
Incremental row with id 1 gets appended into the full load bucket instead of the original row getting updated. What am I doing wrong?
The overall flow should be:
Create and write a delta table:
dataFrame.write \
.format("delta") \
.mode("overwrite") \
.option("path", s3_path) \
.saveAsTable(f"{database_name}.{table_name})
Merge:
delta_df.alias("prev_df").merge(
cdc_df.alias("append_df"),
# matching on primarykey
"prev_df.id = append_df.id") \
.whenMatchedUpdate(set= {
"B": "append_df.B",
"C": "append_df.C",
"visitor": "append_df.visitor"
})\
.whenNotMatchedInsert(values = {
#inserting a new row to Delta table
"id": "append_df.id",
"B": "append_df.B",
"C": "append_df.C",
"visitor": "append_df.visitor"
})\
.execute()