Search code examples
pysparkazure-databricks

Databricks UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE


When running a notebook in Databricks, it is returning the following error: [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE] Encountered unknown fields during parsing: [<field_name>], which can be fixed by an automatic retry: true

The documentation here https://docs.databricks.com/error-messages/unknown-field-exception-error-class.html is not clearly on why that error exists.

The dataframe is configured to use a schema as follows.

def bronze_ingestion(table_group, table_name, table_state):
    df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.allowOverwrites", True)
        .option("multiLine", True)
        .option("quote", '"')
        .option("escape", '"')
        .option("pathGlobFilter", "*.csv")
        .option("cloudFiles.schemaLocation", f"{source_storage}/{source_base_folder}-schema/{project_code}/{table_group}/{table_name}/{table_state}")
        .load(f"{source_storage}/{source_base_folder}/{project_code}/{table_group}/{table_name}/{table_state}")
        .select("*", 
            col("_metadata.file_path").alias("source_file"),
            current_timestamp().alias("processing_time")
        )
    )

    df = set_safe_column_names(df)

    df = df.withColumn("lake_id", row_uuid())
    df = df.withColumn("date_import", date_import_from_source_file(df['source_file']))

    streamWriter = (df.writeStream
        .option("overwriteSchema", "true")
        .option("mergeSchema", "true")
        .option("checkPointLocation", f"{source_storage}/{source_base_folder}-checkpoint/{project_code}/{table_group}/{table_name}/{table_state}")
        .trigger(availableNow=True)
        .toTable(f"`{bronze_catalog}`.`{bronze_schema}`.`{table_group}_{table_name}_{table_state}`")
    )

    streamWriter.awaitTermination()

    return streamWriter

What can be causing that parsing error?


Solution

  • [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE] Encountered unknown fields during parsing: [<field_name>], The above error indicates that the structure of the data being read from the CSV files doesn't match the expected schema.

    According to the documentation you gave Unknown fields inside file path:

    For example I have a csv file that I have updated with 2 new columns and uploaded to the ADLS's samplefiles directory.

    When you are reading the file from the source and Autoloader finds the new columns not matching the schema. Autoloader adds a new column called Rescued record schema: <newColumnsInRecordSchema> Source file:

    enter image description here

    In the below code I am reading the source file from ADLS with 2 extra columns then writing the data from source file adding 1 new column. in DELTA format to ADLS into Autoloader folder.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    from delta.tables import DeltaTable
    spark = SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()
    spark.conf.set("fs.azure.account.key.adlsgen2may15.dfs.core.windows.net", "ttCHZNZsg0M3x/WzGArHAh8ZPWKtVL8mMkct5DFAODJ46/yBA/T8U39xQ0obL12bGAT7XF1Hy4jw+AStkqafOQ==")
    source_location = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/samplefiles/'
    target_location = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/Autoloader/'
    checkpoints_loc = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/Checkpoints/'
    schema_loc = f'abfss://folder02@adlsgen2may15.dfs.core.windows.net/schema/'
    df = (
    spark.readStream.format('cloudFiles')
    .option('cloudFiles.format', 'CSV')
    .option('cloudFiles.schemaLocation', schema_loc)
    .load(source_location)
    )
    df_with_new_field = df.withColumn("new_column", lit("new_value"))
    query = (
    df_with_new_field.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoints_loc)
    .outputMode("append")
    .start(target_location)
    )
    

    The below code will enable automatic retries for schema evolution in Delta Lake

    SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()
    

    The below image shows the 2 columns from the source and 1 new column added in the code.

    enter image description here