When running a notebook in Databricks, it is returning the following error: [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE] Encountered unknown fields during parsing: [<field_name>], which can be fixed by an automatic retry: true
The documentation here https://docs.databricks.com/error-messages/unknown-field-exception-error-class.html is not clearly on why that error exists.
The dataframe is configured to use a schema as follows.
def bronze_ingestion(table_group, table_name, table_state):
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.allowOverwrites", True)
.option("multiLine", True)
.option("quote", '"')
.option("escape", '"')
.option("pathGlobFilter", "*.csv")
.option("cloudFiles.schemaLocation", f"{source_storage}/{source_base_folder}-schema/{project_code}/{table_group}/{table_name}/{table_state}")
.load(f"{source_storage}/{source_base_folder}/{project_code}/{table_group}/{table_name}/{table_state}")
.select("*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
df = set_safe_column_names(df)
df = df.withColumn("lake_id", row_uuid())
df = df.withColumn("date_import", date_import_from_source_file(df['source_file']))
streamWriter = (df.writeStream
.option("overwriteSchema", "true")
.option("mergeSchema", "true")
.option("checkPointLocation", f"{source_storage}/{source_base_folder}-checkpoint/{project_code}/{table_group}/{table_name}/{table_state}")
.trigger(availableNow=True)
.toTable(f"`{bronze_catalog}`.`{bronze_schema}`.`{table_group}_{table_name}_{table_state}`")
)
streamWriter.awaitTermination()
return streamWriter
What can be causing that parsing error?
[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE] Encountered unknown fields during parsing: [<field_name>], The above error indicates that the structure of the data being read from the CSV files doesn't match the expected schema.
According to the documentation you gave Unknown fields inside file path:
For example I have a csv file that I have updated with 2 new columns and uploaded to the ADLS's samplefiles directory.
When you are reading the file from the source and Autoloader finds the new columns not matching the schema.
Autoloader adds a new column called Rescued record schema: <newColumnsInRecordSchema>
Source file:
In the below code I am reading the source file from ADLS with 2 extra columns then writing the data from source file adding 1 new column. in DELTA format to ADLS into Autoloader folder.
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()
spark.conf.set("fs.azure.account.key.adlsgen2may15.dfs.core.windows.net", "ttCHZNZsg0M3x/WzGArHAh8ZPWKtVL8mMkct5DFAODJ46/yBA/T8U39xQ0obL12bGAT7XF1Hy4jw+AStkqafOQ==")
source_location = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/samplefiles/'
target_location = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/Autoloader/'
checkpoints_loc = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/Checkpoints/'
schema_loc = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/schema/'
df = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'CSV')
.option('cloudFiles.schemaLocation', schema_loc)
.load(source_location)
)
df_with_new_field = df.withColumn("new_column", lit("new_value"))
query = (
df_with_new_field.writeStream
.format("delta")
.option("checkpointLocation", checkpoints_loc)
.outputMode("append")
.start(target_location)
)
The below code will enable automatic retries for schema evolution in Delta Lake
SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()
The below image shows the 2 columns from the source and 1 new column added in the code.