Search code examples
apache-sparkspark-streamingdelta-lake

Stream writes having multiple identical keys to delta lake


I am writing streams to delta lake through spark structured streaming. Each streaming batch contains key - value (also contains timestamp as one column). delta lake doesn't support of update with multiple same keys at source( steaming batch) So I want to update delta lake with only record with latest timestamp. How can I do this ?

This is code snippet I am trying:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  println(s"Executing batch $batchId ...")
  microBatchOutputDF.show()

  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

Thanks in advance.


Solution

  • You can eliminate records having older timestamp from your "microBatchOutputDF" dataframe & keep only record with latest timestamp for given key.

    You can use spark's 'reduceByKey' operation & implement custom reduce function as below.

    def getLatestEvents(input: DataFrame) : RDD[Row] = {
    input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }
    
    
    def reduceFun(x: Row, y: Row) : Row = {
    if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }
    

    Assumed key is of type string & timestamp of type timestamp. And call "getLatestEvents" for your streaming batch 'microBatchOutputDF'. It ignores older timestamp events & keeps only latest one.

    val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)
    

    Then call deltalake merge operation on top of 'latestRecordsDF'