Suppose you do something like
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "...")
deltaTable.updateExpr(
"column_name = value",
Map("updated_column" -> "'new_value'")
val df = deltaTable.toDF
Will df
re-read the underlying Delta table contents on demand whenever accessed (e.g., df.count()
), post-update? Such that deltaTable.toDF
is effectively equivalent to spark.read.format("delta").load(path)
?
Or will it re-apply the series of updates whenever df
is accessed?
Whenever you use .toDF()
on a DeltaTable
in Apache Spark, you're getting the most current snapshot of the table's data at the moment you execute your code. This is due to the ways that the DeltaTable object is created, and the way that .toDF is executed, which does not pin to a particular version. So, when you update a DeltaTable and then read it with deltaTable.toDF()
, Spark fetches the updated data at that moment.
We can override this behavior with some use of the time travel API, see Pinned reads below.
Pinned reads
Let's say you want to consistently read from the same version of a Delta table more than one time, even if the table gets updated in the meantime. You can 'pin' the table to a specific version like this:
// Optional: Get the latest version of the table at the start of your session.
val latestHistRow = DeltaTable.forPath(spark, tablePath)
.history() // Retrieve table history
.limit(1) // Get the latest version
.collect() // Eagerly evaluate to get the version info
.apply(0) // Pull the only Row out
val pinnedVersion = latestHistRow.getLong(latestHistRow.fieldIndex("version"))
// Use this version for reading the table
val df = spark.read
.format("delta")
.option("versionAsOf", pinnedVersion) // "time travel"
.load(tablePath)
This way, df will always represent the data of the Delta table at pinnedVersion, even if newer versions are available.
And please remember that Delta Lake has data retention settings that determine how long old versions are kept, so the efficacy of this approach depends on those settings.