We have a Delta merge case in our Spark + Scala code as following:
deltaTable.as("existing")
.merge(dfNewData.as("new"), "new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
.whenMatched()
.update(Map("Id1" -> col("new.Id1"),... => around 20 columns
))
.whenNotMatched()
.insertAll()
.execute()
To improve the performance, I want to check partition pruning (https://kb.databricks.com/delta/delta-merge-into)
The SerialNumber field above in the above code is filtered in chunks with Min and Max values set (with Int variables SerialMin and SerialMax). I want to use these vals in the delta merge something like
deltaTable.as("existing")
.merge(dfNewData.as("new"), "existing.SerialNumber >= lit(SerialMin) and existing.SerialNumber < lit(SerialMax) and and new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
.whenMatched()
.update(Map("Id1" -> col("new.Id1"),... => around 20 columns
))
.whenNotMatched()
.insertAll()
.execute()
But the above code is throwing an error saying that this is not supported. Any idea if we can somehow use variables in the delta merge call?
The issue is resolved with the below code:
deltaTable.as("existing")
.merge(dfNewData.as("new"), s"existing.SerialNumber >= $SerialMin and existing.SerialNumber < $SerialMax and and new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
.whenMatched()
.update(Map("Id1" -> col("new.Id1"),... => around 20 columns
))
.whenNotMatched()
.insertAll()
.execute()