These are my tables:
destination
new_data
In Oracle SQL I can do this:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1, c2, d1)
VALUES (n.c1, n.c2, n.d1);
Then destination
table becomes this:
If c1
, c2
exists in destination
and d1
is null, d1
gets updated.
If c1
, c2
do not exist, rows get inserted.
Is there a way to do the same in PySpark?
This generates the dataframes:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5),
('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)
nData = [('a', 'b', 1),
('c', 'd', 6),
('e', 'f', 7),
('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)
In PySpark there is almost everything what is in SQL. But I don't find the equivalent for MERGE
.
In SQL, MERGE
can be replaced by left join union right join <=> full outer join:
merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
.selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
merged.show()
#+---+---+----+
#| c1| c2| d1|
#+---+---+----+
#| e| f| 7|
#| g| h|null|
#| c| d| 6|
#| a| b| 5|
#+---+---+----+
However, every time you perform this merge, you'll need to rewrite all your data into the destination as Spark doesn't support updates and it can lead to bad performance. So if you really need to do this I'll advise you to take look at Delta Lake which brings ACID transactions to spark, and which supports MERGE syntax.