I have a CDF logic where I need to know of row impacted due to merge i.e I need to know number of inserted, Updated and deleted row in order to make some decision. i am able to get to the required information in SQL, but did not get in pyspark.
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Yes, the .execute
in the Python API doesn't return any value. But you can pull necessary information using the .history
on the Delta table. Just do:
lastOperationDF = deltaTable.history(1)
on the table after the MERGE
is done (assuming you don't have parallel operations). You need to look into the operationMetrics
column - this table in the docs lists all available for the MERGE
command: numTargetRowsInserted
, numTargetRowsUpdated
, numTargetRowsDeleted
, etc.