Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert

These are my tables:
In Oracle SQL I can do this:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

Then destination table becomes this:
If c1, c2 exists in destination and d1 is null, d1 gets updated.
If c1, c2 do not exist, rows get inserted.

Is there a way to do the same in PySpark?

This generates the dataframes:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

In PySpark there is almost everything what is in SQL. But I don't find the equivalent for MERGE.


  • In SQL, MERGE can be replaced by left join union right join <=> full outer join:

    merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
        .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
    #| c1| c2|  d1|
    #|  e|  f|   7|
    #|  g|  h|null|
    #|  c|  d|   6|
    #|  a|  b|   5|

    However, every time you perform this merge, you'll need to rewrite all your data into the destination as Spark doesn't support updates and it can lead to bad performance. So if you really need to do this I'll advise you to take look at Delta Lake which brings ACID transactions to spark, and which supports MERGE syntax.