Search code examples
pythonapache-sparkpysparksql-merge

Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert


These are my tables:
destination
enter image description here
new_data
enter image description here

In Oracle SQL I can do this:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

Then destination table becomes this:
enter image description here

If c1, c2 exists in destination and d1 is null, d1 gets updated.
If c1, c2 do not exist, rows get inserted.

Is there a way to do the same in PySpark?

This generates the dataframes:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

In PySpark there is almost everything what is in SQL. But I don't find the equivalent for MERGE.


Solution

  • In SQL, MERGE can be replaced by left join union right join <=> full outer join:

    merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
        .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
    
    merged.show()
    
    #+---+---+----+
    #| c1| c2|  d1|
    #+---+---+----+
    #|  e|  f|   7|
    #|  g|  h|null|
    #|  c|  d|   6|
    #|  a|  b|   5|
    #+---+---+----+
    

    However, every time you perform this merge, you'll need to rewrite all your data into the destination as Spark doesn't support updates and it can lead to bad performance. So if you really need to do this I'll advise you to take look at Delta Lake which brings ACID transactions to spark, and which supports MERGE syntax.