I have below script which I run in my unity catalog enabled databricks workspace and get the below error. The schema and code worked for my other tenant in different workspace and I was hoping it was same for this tenant. now I dont have time to look into the schema changes. Can anyone tell me what below error suggest. What will happen if I put .option("mergeSchema", "true")' in my code below
Error:
rg.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 50a8bd1e32). To enable schema migration using DataFrameWriter or DataStreamWriter, please set: '.option("mergeSchema", "true")'. For other operations, set the session configuration spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation specific to the operation for details.
Code:
table_name= "main.auditlogs.Customer_Logs"
checkpoint_path = "/mnt/_checkpoint"
file_path ="/mnt/topics/audit-logs"
schema = "authenticationMechanism STRING,authenticationMechanismId STRING,environment STRUCT<cdfCluster: STRING,loggerInfo STRUCT<transport: STRING>,method STRING,path STRING,principalProjectId STRING,principalProjectIds ARRAY<STRING>,principalUniqueId STRING,principalUniqueName STRING,remoteIp STRING,requestId STRING,requestProjectId STRING,requestProjectUrlName STRING,requestTicket STRING,sourceCommitHash STRING,timestamp STRING,xHeaderFields ARRAY<STRUCT<key: STRING, value: STRING>>"
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("ignoreMissingFiles", "true")
.schema(schema)
.load(file_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Is there any way I can ignore the schema and load everything as in the table I have created?
When you use the .option("mergeSchema", "true")
then the destination table schema will be updated to match the schema of your DataFrame - new columns & fields in the struct will be added, some column types could be changed to incorporate bigger data types.
So you have a following choice:
add the .option("mergeSchema", "true")
when writing a table, then your table schema will be updated
use only columns that you have in the destination table, but then you will lose the new columns - just add the .select(*orig_table.columns)
between .load
and .writeStream
:
...
orig_table = spark.read.table(table_name)
....
.load(file_path)
.select(*orig_table.columns)
.writeStream
....
You can find more information in this blog post.