Search code examples
apache-sparkdatabricksspark-structured-streamingdelta-lake

How to get new/updated records from Delta table after upsert using merge?


Is there any way to get updated/inserted rows after upsert using merge to Delta table in spark streaming job?


val df = spark.readStream(...)
val deltaTable = DeltaTable.forName("...")


def upsertToDelta(events: DataFrame, batchId: Long) {

deltaTable.as("table")
    .merge(
      events.as("event"), 
      "event.entityId == table.entityId")
    .whenMatched()
        .updateExpr(...))
    .whenNotMatched()
      .insertAll()
    .execute()
}

df
  .writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

I know I can create another job to read updates from the delta table. But is it possible to do the same job? From what I can see, execute() returns Unit.


Solution

  • You can enable Change Data Feed on the table, and then have another stream or batch job to fetch the changes, so you'll able to receive information on what rows has changed/deleted/inserted. It could be enabled with:

    ALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    

    if thable isn't registered, you can use path instead of table name:

    ALTER TABLE delta.`path` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    

    The changes will be available if you add the .option("readChangeFeed", "true") option when reading stream from a table:

    spark.readStream.format("delta") \
      .option("readChangeFeed", "true") \
      .table("table_name")
    

    and it will add three columns to table describing the change - the most important is _change_type (please note that there are two different types for update operation).

    If you're worried about having another stream - it's not a problem, as you can run multiple streams inside the same job - you just don't need to use .awaitTermination, but something like spark.streams.awaitAnyTermination() to wait on multiple streams.

    P.S. But maybe this answer will change if you explain why you need to get changes inside the same job?