I have parquet files with evolving schema, I need to load all of them into single Delta Table. My goal is to use Autoloader and schemaEvolutionMode
="rescue
" (so all fields from the source which are not aligned with the target schema should fall into "_rescued_data
" column). I also provide .schema(target_schema)
for autoloader.
But when I read from some files I get this error:
Invalid Spark read type: expected optional group my_column (LIST)
{ repeated group list { optional binary element (STRING); } }
to be list but found Some(StringType)
my_column
has data type String in the target table.
So why it was not loaded into _rescued_data
column and raised the error instead?
The code which I'm using:
read_options = {
"cloudFiles.format": "parquet",
"cloudFiles.schemaLocation: "some location",
"cloudFiles.schemaEvolutionMode": "rescue"
}
spark.readStream.format("cloudFiles")
.options(**read_options)
.schema(target_schema)
.load("source_path")
.foreachBatch(<save function>)
.outputMode("append")
.trigger("availableNow", True)
.start()
Databricks version is 13.2 (Spark 3.4.0, Scala 2.12)
The reason of the error was that "rescued_data" column was cleared after reading the dataframe (before writing).