Search code examples
apache-sparkdatabricksspark-structured-streamingdelta-live-tables

spark streaming and delta tables: java.lang.UnsupportedOperationException: Detected a data update


The setup:

Azure Event Hub -> raw delta table -> agg1 delta table -> agg2 delta table

The data is processed by spark structured streaming.

Updates on target delta tables are done via foreachBatch using merge.

In the result I'm getting error:

java.lang.UnsupportedOperationException: Detected a data update (for example partKey=ap-2/part-00000-2ddcc5bf-a475-4606-82fc-e37019793b5a.c000.snappy.parquet) in the source table at version 2217. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

Basically I'm not able to read the agg1 delta table via any kind of streaming. If I switch the last streaming from delta to memory I'm getting the same error message. With first streaming I don't have any problems.

Notes.

  1. Between aggregations I'm changing granuality: agg1 delta table (trunc date to minutes), agg2 delta table (trunc date to days).
  2. If I turn off all other streaming, the last one still doesn't work
  3. The agg2 delta table is new fresh table with no data

Solution

  • How the streaming works on the source table: It reads the files that belongs to our source table. It's not able to handle changes in these files (updates, deletes). If anything like that happens you will get the error above. In other words. DDL operations modify the underlying files. The only difference is for INSERTS. New data arrives in new file if not configured differently.

    To fix that you would need to set an option: ignoreChanges to True. This option will cause that you will get all the records from the modified file. So, you will get again the same records as before plus this one modified.

    The problem: we have aggregations, the aggregated values are stored in the checkpoint. If we get again the same record (not modified) we will recognize it as an update and we will increase the aggregation for its grouping key.

    Solution: we can't read agg table to make another aggregations. We need to read the raw table.

    reference: https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes

    Note: I'm working on Databricks Runtime 10.4, so I'm using new shuffle merge by default.