Search code examples
pysparkdatabricksazure-databricksdelta-lakedatabricks-autoloader

schema mismatch error in databricks while reading file from storage account


I have below script which I run in my unity catalog enabled databricks workspace and get the below error. The schema and code worked for my other tenant in different workspace and I was hoping it was same for this tenant. now I dont have time to look into the schema changes. Can anyone tell me what below error suggest. What will happen if I put .option("mergeSchema", "true")' in my code below

Error:

rg.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 50a8bd1e32). To enable schema migration using DataFrameWriter or DataStreamWriter, please set: '.option("mergeSchema", "true")'. For other operations, set the session configuration spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation specific to the operation for details.

Code:

table_name= "main.auditlogs.Customer_Logs"
checkpoint_path = "/mnt/_checkpoint"
file_path ="/mnt/topics/audit-logs"
schema = "authenticationMechanism STRING,authenticationMechanismId STRING,environment STRUCT<cdfCluster: STRING,loggerInfo STRUCT<transport: STRING>,method STRING,path STRING,principalProjectId STRING,principalProjectIds ARRAY<STRING>,principalUniqueId STRING,principalUniqueName STRING,remoteIp STRING,requestId STRING,requestProjectId STRING,requestProjectUrlName STRING,requestTicket STRING,sourceCommitHash STRING,timestamp STRING,xHeaderFields ARRAY<STRUCT<key: STRING, value: STRING>>"
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .option("ignoreMissingFiles", "true")
  .schema(schema)  
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Is there any way I can ignore the schema and load everything as in the table I have created?


Solution

  • When you use the .option("mergeSchema", "true") then the destination table schema will be updated to match the schema of your DataFrame - new columns & fields in the struct will be added, some column types could be changed to incorporate bigger data types.

    So you have a following choice:

    • add the .option("mergeSchema", "true") when writing a table, then your table schema will be updated

    • use only columns that you have in the destination table, but then you will lose the new columns - just add the .select(*orig_table.columns) between .load and .writeStream:

    ...
    orig_table = spark.read.table(table_name)
    
    ....
      .load(file_path)
      .select(*orig_table.columns)
      .writeStream
    ....
    

    You can find more information in this blog post.