Search code examples
sqlapache-spark-sqlamazon-athenaapache-iceberg

Athena/iceberg MERGE INTO not applying both DELETE and UPDATE on the same key


I want to merge a staging table into main table, the staging table looks like this

name | id | keys_id | event_name | event_time
a    | 1  | 1       | INSERT     | 1
b    | 1  | 1       | MODIFY     | 1
     |    | 1       | REMOVE     | 1

I have this merge into query:

MERGE INTO new_db_test.test_iceberg_table AS target USING new_db_test.staging_table
AS source
ON target.id = source.keys_id
WHEN MATCHED AND source.event_name = 'MODIFY' 
THEN UPDATE SET name = source.name, id = source.id
WHEN MATCHED AND source.event_name = 'REMOVE'
THEN DELETE
WHEN NOT MATCHED AND source.event_name = 'INSERT' 
THEN INSERT (id, name) 
VALUES (source.id, source.name)

the test_iceberg_table should be empty out of this, but the remove event is not applied and the table still has the row with updated value:

name | id 
b    | 1  

When I remove the condition for update, it deletes the record just fine. What could be wrong here? I would also appreciate some resource where I can learn how icerberg applies merge into.


Solution

  • Each target row is only affected once, and if multiple source rows match then a single arbitrary source row will be chosen. (In some DBMSs you get an error when this happens.)

    So you need to pre-aggregate your source, and filter it in some way that REMOVE comes first.

    WITH source AS (
        SELECT s.*
        FROM (
            SELECT s.*,
              ROW_NUMBER() OVER (PARTITION BY s.keys_id ORDER BY s.event_name DESC) AS rn
            FROM new_db_test.staging_table AS s
        ) s
        WHERE s.rn = 1
    )
    MERGE INTO new_db_test.test_iceberg_table AS target
    USING source
    ON target.id = source.keys_id
    WHEN MATCHED AND source.event_name = 'MODIFY' 
      THEN UPDATE SET
        name = source.name,
        id = source.id
    WHEN MATCHED AND source.event_name = 'REMOVE'
      THEN DELETE
    WHEN NOT MATCHED AND source.event_name = 'INSERT' 
      THEN INSERT
        (id, name) 
      VALUES
        (source.id, source.name);