I have an use case, where I have a file path which has "company_id" and "date". Have another s3 file where I need to delete data which is below year 2010 based on "company_id" and "date" of first file.
Second file has many columns including "company_id","date" and "year".
How this can be achieved , i tried this but not giving an error.
Tried :
val source="s3a://source_table"
val target="s3a://target_table"
val source_delta = DeltaTable.forPath(spark, source)
val target_delta = DeltaTable.forPath(spark, target)
target_delta .alias("a")
.merge(
source_delta .alias("b"),
condition=( ($"b.company_id" === $"a.company_id") and ($"b.data" === $"a.date") and ($"a.year" <= "2010") )
)
.whenMatched().delete()
.execute()
println("Done")
Error :
command-4296412619890660:18: error: overloaded method value merge with alternatives:
(source: org.apache.spark.sql.DataFrame,condition: org.apache.spark.sql.Column)io.delta.tables.DeltaMergeBuilder <and>
(source: org.apache.spark.sql.DataFrame,condition: String)io.delta.tables.DeltaMergeBuilder
cannot be applied to (io.delta.tables.DeltaTable, condition: org.apache.spark.sql.Column)
.merge(
^
You need to use dataframe as source not a delta table.
Use source like below.
import io.delta.tables._
val source_delta = DeltaTable.forName(spark, "src")
val target_delta = DeltaTable.forName(spark, "tar")
target_delta.alias("a").merge(
source_delta.toDF.alias("b"),
"b.company_id=a.company_id")
.whenMatched().delete().execute()
Matched company_id
are removed.
Before merge.
After merge.
Source having company id 3 and 4.