Search code examples
pysparkdatabricksdelta-lake

DeltaLake/DeltaTable merge operation inserts/duplicates matched rows not updating them


The following URLs are for two DeltaTables ake_original and ake_updates:

  • ake_original
  • ake_updates
    Each DeltaTable has two columns:
  • AKE_ID PRIMARY KEY pa.String()
  • MODIFIED_DATE pa.timestamp('us')

    Details:
  • First DeltaTable 'ake_original' has 59,767 rows. for all the data inserted on 07 June 2024 from 00:00:00 to 23:59:59.
  • Second DeltaTable 'ake_updates' has 6,262 rows. representing some of the data inserted on 07 June 2024 from 00:00:00 to 23:59:59 but updated on 08 June 2024 from 00:00:00 to 23:59:59.

    Using below Python code to merge updated rows into the original rows using unique column AKE_ID resulting in rows duplication.

    It inserts/duplicates 4,276 and updates 1,986 rows.
    What is the reason for data duplication ?
from deltalake import DeltaTable, write_deltalake
import pyarrow.dataset as ds

deltaTable = DeltaTable('ake_original')
dataset_update = ds.dataset('ake_updates')
df = dataset_update.to_table()

(
    deltaTable.merge(
        source=df,
        predicate="s.AKE_ID = t.AKE_ID", 
        source_alias="s",
        target_alias="t",
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)

print(deltaTable.history())

Solution

  • I took a look at _delta_log of original database. You were not doing inserts. You were doing overwrites so original database is not 59767 records long, but just 9767.

    For example, changes made by 0th and by 1st revisions

    ...0000.json
    {"add":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet", ....
    
    ...0001.json
    
    {"add":{"path":"1-b46332c9-3183-40d9-a78c-89dbffbd430e-0.parquet"...
    {"remove":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet",....{"mode":"Overwrite","partitionBy":"[]"...
    

    1st revision removes file created by 0th revision and adds new one but it is just 10000 records

    You can check the rest - you remove previous file and create new one.

    Solution: when you are populating original database, you should use mode=append


    To recover overwritten records (never done by myself but you can try):