Search code examples
pysparkazure-synapsedelta-lake

How to update already added data in delta table et insert new ones?


I have a dataframe in pyspark where I read same file everyday. New data are added in this file. I insert the data in a delta table.

I want to update already inserted data because sometimes data are updated in the file I read.

My dataframe looks like this:

col1    col2    col3           col4
name_1  city_1  date_of_today  {'Date': X, "Weight (KG)": 13.4}
name_1  city_1  date_of_today  {'Date': X, "Weight (KG)": 1000}

I insert it in delta table, I want to update data where condition is met.

I did this :

from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "abfss://[email protected]/db/db_name/table/")

deltaTable.alias("target").merge(
    df_content_transformed.alias("source"),
    condition
    ) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

condition value is :

"""target.name = source.name AND target.city= source.city AND get_json_object(target.value, '$.Date') = get_json_object(source.value, '$.Date') AND get_json_object(target.value, '$.Weight (KG)') = get_json_object(source.value, '$.Weight (KG)')"""

When I execute a pipeline calling this code I have the following error :

Py4JJavaError: An error occurred while calling o4197.execute.
: org.apache.spark.sql.delta.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches.

Whan can I do ?


Solution

  • You need to ensure that for every row in your target table, there's at most one corresponding row in your source dataframe that matches the merge condition.

    To solve this:

    1. Deduplicate the Source DataFrame: Before merge, deduplicate df_content_transformed dataframe to make sure there's only one unique row for each combination of values that you use in your merge condition.

      from pyspark.sql import Window
      import pyspark.sql.functions as F
      
      window_spec = Window.partitionBy("name", "city", F.expr("get_json_object(value, '$.Date')"), F.expr("get_json_object(value, '$.Weight (KG)')")).orderBy("date_of_today")
      
      df_deduplicated = df_content_transformed.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")
      
    2. Perform the Merge with the Deduplicated DataFrame: Now use the deduplicated dataframe (df_deduplicated) in your merge operation.

      deltaTable.alias("target").merge(
          df_deduplicated.alias("source"),
          condition
          ) \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
      

    In the deduplication step above, I've used the date_of_today column to choose which row to keep in the case of duplicates. This is only a sample logic. You can adjust this based on your requirements.