I am writing streams to delta lake through spark structured streaming. Each streaming batch contains key - value (also contains timestamp as one column). delta lake doesn't support of update with multiple same keys at source( steaming batch) So I want to update delta lake with only record with latest timestamp. How can I do this ?
This is code snippet I am trying:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
println(s"Executing batch $batchId ...")
microBatchOutputDF.show()
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
Thanks in advance.
You can eliminate records having older timestamp from your "microBatchOutputDF" dataframe & keep only record with latest timestamp for given key.
You can use spark's 'reduceByKey' operation & implement custom reduce function as below.
def getLatestEvents(input: DataFrame) : RDD[Row] = {
input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }
def reduceFun(x: Row, y: Row) : Row = {
if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }
Assumed key is of type string & timestamp of type timestamp. And call "getLatestEvents" for your streaming batch 'microBatchOutputDF'. It ignores older timestamp events & keeps only latest one.
val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)
Then call deltalake merge operation on top of 'latestRecordsDF'