Search code examples
apache-sparkpysparkazure-databricksspark-structured-streamingdelta-lake

Autoloader schema evolution using foreachBatch


Im faicing some schema evolution in my work process and I cant find the away to make it work.

Last week, I had 5 columns enabled in the ERP system and business requried to add the 6th column in the table.

My Deltatable was created when I had only 5 column, so Im faicing the problem in Merge into function that can not deal with the extra column now.

If I dont use the foreachBatch approach which has the def, containing Merge into, I could solve that problem with df.writeStream.format("delta").option("overwriteSchema", True) this line of code, which autoamtically adds the column in it. I tried to use it with foreachBatch but it still has the problem with merging with obvious reasons.

Mycode:

    def update_insert(df, epochId, cdm):
    deltaTable = DeltaTable.forPath(spark,f"abfss://{container_write}@{storage_write}.dfs.core.windows.net/D365/{cdm}"+"_ao")
    deltaTable.alias('table') \
  .merge(dfUpdates.alias("newData"),
    string
  ) \
  .whenMatchedUpdate(set =
    dictionary
  ) \
  .whenNotMatchedInsert(values =
    dictionary
  ) \
  .execute()



df.writeStream.format("delta").option("overwriteSchema", True).foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()

Preferably I want to include that extra column in my Deltatable too. How can I achieve that?


Solution

  • The mergeSchema or overwriteSchema doesn't work with MERGE - instead, you need to set Spark conf property spark.databricks.delta.schema.autoMerge.enabled to true as it's described in the MERGE documentation.

    P.S. You don't need .format("delta").option("overwriteSchema", True) with foreachBatch...