Search code examples
pysparkdatabricksdelta-lakecdfchange-data-capture

How to know number of rows affected by CDF merge in pyspark?


I have a CDF logic where I need to know of row impacted due to merge i.e I need to know number of inserted, Updated and deleted row in order to make some decision. i am able to get to the required information in SQL, but did not get in pyspark.


from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()


Solution

  • Yes, the .execute in the Python API doesn't return any value. But you can pull necessary information using the .history on the Delta table. Just do:

    lastOperationDF = deltaTable.history(1)
    

    on the table after the MERGE is done (assuming you don't have parallel operations). You need to look into the operationMetrics column - this table in the docs lists all available for the MERGE command: numTargetRowsInserted, numTargetRowsUpdated, numTargetRowsDeleted, etc.