Search code examples
apache-spark-sqldatabricksazure-databricksdelta-lake

Joining two tables for delete using DeltaTable


I have an use case, where I have a file path which has "company_id" and "date". Have another s3 file where I need to delete data which is below year 2010 based on "company_id" and "date" of first file.

Second file has many columns including "company_id","date" and "year".

How this can be achieved , i tried this but not giving an error.

Tried :

val source="s3a://source_table"
val target="s3a://target_table"


val source_delta = DeltaTable.forPath(spark, source)
val target_delta = DeltaTable.forPath(spark, target)


target_delta .alias("a")
      .merge(
        source_delta .alias("b"),
        condition=( ($"b.company_id" === $"a.company_id") and  ($"b.data" === $"a.date") and ($"a.year" <= "2010") )
        )
      .whenMatched().delete()
      .execute()

println("Done")

Error :

command-4296412619890660:18: error: overloaded method value merge with alternatives:
  (source: org.apache.spark.sql.DataFrame,condition: org.apache.spark.sql.Column)io.delta.tables.DeltaMergeBuilder <and>
  (source: org.apache.spark.sql.DataFrame,condition: String)io.delta.tables.DeltaMergeBuilder
 cannot be applied to (io.delta.tables.DeltaTable, condition: org.apache.spark.sql.Column)
      .merge(
       ^

Solution

  • You need to use dataframe as source not a delta table.

    Use source like below.

    import  io.delta.tables._
    
    val  source_delta = DeltaTable.forName(spark, "src")
    val  target_delta = DeltaTable.forName(spark, "tar")
    
    target_delta.alias("a").merge(
        source_delta.toDF.alias("b"),
        "b.company_id=a.company_id")
        .whenMatched().delete().execute()
    

    enter image description here

    Matched company_id are removed.

    Before merge.

    enter image description here

    After merge.

    enter image description here

    Source having company id 3 and 4.