Search code examples
apache-sparkapache-spark-sqldatabricksazure-databricksdelta-lake

How to upsert data with multiple source rows matching the target table in Databricks Delta Tables


Problem description

My intention is to execute the following action:

Given a table with new data with one or more dates:

new_data

And a target table with historical data like the one below:

historical_data

I would like to replace the range of dates (in this example it's only one day 17/10/2022) in the historical data so that the result would be something like the image below:

desired_result

Tentative

To reach this result my first implementation was to use the MERGE INTO Databricks SQL clause below:

MERGE INTO historical_data
    USING new_data
    ON historical_data.Date = new_data.Date
    WHEN MATCHED
        UPDATE SET *
    WHEN NOT MATCHED
        THEN INSERT *

But this code raises an error like that:

UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the
same target row in the Delta table in possibly conflicting ways.

The error is self-explained and the only way that I have found to solve the problem was to implement a two steps SQL.

My current solution

First, delete the range in the target table and then insert the new content.

Here you can see the implementation:

--DELETE THE RANGE FROM THE TARGET TABLE
DELETE FROM historical_data
WHERE
  historical_data.Date >= (SELECT MIN(new_data.Date) FROM new_data) AND 
  historical_data.Date <= (SELECT MAX(new_data.Date) FROM new_data);

-- INSERT THE NEW DATA INTO THE HISTORICAL TABLE
INSERT INTO TABLE historical_data
SELECT * FROM new_data;

Drawbacks and the main question

The drawback of this implementation is that it does not execute the action atomically, instead, it is done in two steps. This behavior is not expected by me because in case the script breaks for some reason there is always a possibility to have a table with the deleted data and missing the new data. I am looking for a way to solve that. Does someone know how to help me?


Solution

  • Well, now there is a new option in databricks called REPLACE WHERE

    INSERT INTO [ TABLE ] table_name
        REPLACE WHERE predicate
        query
    

    According to the documentation this feature fit exactly the problem I had in the past. Please see the quote:

    If table_name is a Delta Lake table, delete rows matching boolean_expression before inserting any rows matching boolean-expression specified in query. Rows in query which do not match boolean_expression are ignored.

    More info here