Search code examples
pysparksql-deleteazure-synapse

Delete / overwrite rows of data based on Matched Keys in Spark


I have 2 tables within Lake Database in Synapse (Parquet). It is running in Spark. 1 table with 10 billion rows (tableA) and the other 10 million rows (tableB). I want to delete the 10 mill rows from tableA insert/overwrite with the new data from tableB.

This was my old SQL code from Stored Proc as a reference (not important), but now need in Spark code.

Delete data1 
from TableA data1
where exists (
   select 1
   from TableB data2
   where data1.Country = data2.Country
   and data1.Year = data2.Year
   and data1.Month = data2.Month
   and data1.Store_cd = data2.Store_cd
   and data1.SKU = data2.SKU
);

Spark suggest to load as df and filter, which 10 billion rows is not feasible.

tried a simple Delete -> [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table spark_catalog.dbo.table1 does not support DELETE. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".


Solution

  • It depends on what kind of table you have (Can you share the DML?) and how it's partitioned.

    If it's a Synapse table then you can use Synapse's MERGE T-SQL statement.

    If using pure apache-spark syntax, you might be able to use DML commands, INSERT TABLE or LOAD, depending on how your data is partitioned and all. You might end up re-writing the whole table (if e.g. tableB has data for every partition in tableA).

    E.g.

    -- in an atomic operation, 1) delete rows with ssn = 123456789 and 
    -- 2) insert rows from persons2 
    INSERT INTO persons REPLACE WHERE ssn = 123456789 SELECT * FROM persons2
    

    It might be easier, and a little more platform independent, to use delta table format as Jon suggested. In which case you can simply use the MERGE INTO statement. Something like:

    MERGE INTO tableA
    USING tableB
    ON tableA.pk1 = tableB.pk1 AND tableA.pk2 = tableB.pk2
    WHEN MATCHED
      THEN UPDATE SET
        pk1  = tableB.pk1,
        pk2  = tableB.pk2,
        col1 = tableB.col1,
        col2 = tableB.col2
    WHEN NOT MATCHED
      THEN INSERT (
          ...
      )
      VALUES (
        tableB.pk1,
        tableB.pk2,
        tableB.col1,
        tableB.col2,
      )