I'm working with synapse notebooks and pyspark and I'm trying to support schema evolution in an efficient manner. The format doesn't have to necessarily be DeltaTables but it seems like a natural choice for incremental data load.
This is the code I'm using:
new_delta = spark.read.format('delta').load(delta_table_path)
new_delta.withColumn('test1', lit(None).cast('string')) \
.withColumn('test2', lit(None).cast('string')) \
.write.format('delta') \
.mode('append').option('mergeSchema', 'true') \
.save(delta_table_path)
Looking into the files, the older files have the old schema, and the new files have the new schema.
My expectation here would be that using the append mode and mergeSchema would only update the schema, without creating full history of the data.
Running vacuum will clean up the old data but I'd like to avoid (if possible) writing potentially large volume of data to storage unecessarily.
Using overwrite mode with overwriteSchema has the same behaviour as described above.
Am I misunderstanding how DeltaTables work, should I use a different approach if I'm trying to support schema evolution?
Yes, you're not doing it correctly - you read all your data, added columns, and then wrote the all data back in append mode, creating a copy of the data.
There are different approaches to adding new columns:
instead of writing data in the append
mode, you can overwrite the existing data. But this still will duplicate files in storage
Depending on the version of the Delta table you use, you can use built-in functions to add new columns - you need to use SQL syntax for that (see docs):
# Let's generate some data
>>> spark.range(10).write.format("delta").mode("append").saveAsTable("test")
>>> df = spark.read.table("test")
>>> df.printSchema()
root
|-- id: long (nullable = true)
# Update table
>>> spark.sql("ALTER TABLE test ADD COLUMNS (col1 int, col2 string)")
DataFrame[]
# Check that schema changed and we see new columns
>>> df = spark.read.table("test")
>>> df.printSchema()
root
|-- id: long (nullable = true)
|-- col1: integer (nullable = true)
|-- col2: string (nullable = true)
>>> df.show(2)
+---+----+----+
| id|col1|col2|
+---+----+----+
| 3|null|null|
| 4|null|null|
+---+----+----+
mergeSchema
set to true
, and schema will be updated. - so-called automatic schema evolution.I personally would recommend to use 2nd method.
P.S. There is also a possibility to rename & delete columns - you need to enable column mappings for that.