Search code examples
apache-sparkdelta-lake

how lazy is DeltaTable.toDF (Spark and delta.io)?


Suppose you do something like

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "...")

deltaTable.updateExpr(
  "column_name = value",
  Map("updated_column" -> "'new_value'")

val df = deltaTable.toDF

Will df re-read the underlying Delta table contents on demand whenever accessed (e.g., df.count()), post-update? Such that deltaTable.toDF is effectively equivalent to spark.read.format("delta").load(path)?

Or will it re-apply the series of updates whenever df is accessed?


Solution

  • Whenever you use .toDF() on a DeltaTable in Apache Spark, you're getting the most current snapshot of the table's data at the moment you execute your code. This is due to the ways that the DeltaTable object is created, and the way that .toDF is executed, which does not pin to a particular version. So, when you update a DeltaTable and then read it with deltaTable.toDF(), Spark fetches the updated data at that moment.

    We can override this behavior with some use of the time travel API, see Pinned reads below.

    Pinned reads

    Let's say you want to consistently read from the same version of a Delta table more than one time, even if the table gets updated in the meantime. You can 'pin' the table to a specific version like this:

    // Optional: Get the latest version of the table at the start of your session.
    val latestHistRow = DeltaTable.forPath(spark, tablePath)
                                  .history() // Retrieve table history
                                  .limit(1)  // Get the latest version
                                  .collect() // Eagerly evaluate to get the version info
                                  .apply(0)  // Pull the only Row out
    
    val pinnedVersion = latestHistRow.getLong(latestHistRow.fieldIndex("version"))
    
    // Use this version for reading the table
    val df = spark.read
                  .format("delta")
                  .option("versionAsOf", pinnedVersion) // "time travel"
                  .load(tablePath)
    

    This way, df will always represent the data of the Delta table at pinnedVersion, even if newer versions are available.

    And please remember that Delta Lake has data retention settings that determine how long old versions are kept, so the efficacy of this approach depends on those settings.