Search code examples
databricksazure-databricksspark-structured-streamingdelta-lake

Databricks - readstream from delta table writestream to orc file only with changes


A pipeline runs every 20 minutes pushing data to ADLS Gen2 storage in ORC format.

  1. I have an Azure Databricks notebook job which runs every 1 hour. This job reads the orc file from ADLS as structured stream (orc file created by pipeline mentioned above), then uses the merge functionality to upsert data to delta table based on a primaryKey column.
event_readstream = ( 
    spark.readStream
    .format("orc")
    .schema('my-schema.json')
    .load('/mnt/path/from/adls/to/orc/files/')
  )
  ...

def upsertEventToDeltaTable(df, batch_id): 
  input_df = (
    df
    .drop('address_line3')
    .dropDuplicates(['primaryKey'])
    .withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
  )
  
  input_df.createOrReplaceTempView("event_df_updates")
  
  input_df._jdf.sparkSession().sql("""
    MERGE INTO events dbEvent
    USING event_df_updates dfEvent
    ON dbEvent.primaryKey = dfEvent.primaryKey
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
                             """)

event_writestream = ( 
    event_readStream
    .drop('adddress_line3')  #some column to be dropped
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
    .foreachBatch(upsertEventToDeltaTable)
    .start()
  )
  1. The same notebook also uses the read stream (structured stream) and writes the data directly to a different location in ADLS Gen2 storage. This also uses the forEachBatch() from writestream with checkpoint options enabled.

def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

location_writestream = ( 
  event_readstream   # Same read stream is used here 
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)
  .start()
)

Question:

  • The Delta table is upserts data every 1 hour in my case, if I create new readstream (reading from delta table ) and writestream (write to ORC file). will the ORC file contain only the changes that got merged in the delta table? [Details below]
  • Is there any issues in this approach, if only the changed or updated data will be written to ORC file?

In above point #2, instead of using the readStream (reading from orc file), create a new readStream using the Delta table path like below

deltatbl_event_readstream = spark.readStream.format("delta")
  .load("/mnt/delta/myadlsaccnt/user_events")  # my delta table location
  • and use a different write stream like below
def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

deltatbl_event_readstream
   .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)  # re-using the same method.
  .start()

Solution

  • I came across the link DATA+AI summit which has Demo for such scenario.

    In my case, each batch has >90% new row, less updates. So i can't use this option. This might help others.

    Below is similar to Alex Ott answer, have added additional info

    Per recommendation, if batch updates are more, CDF might not be effective.

    • To enable the CDF feature:
    %sql
    ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
    • Perform any update/insert operation the table
    • use table_changes() function to view the changes
    %sql 
    select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
    
    • To read as stream
    event_read_stream = spark.readStream
         .format("delta")
         .option("readChangeFeed", "true")
         .option("startingVersion", "latest")
         .table("event") #// table name 
         .filter("_change_type != 'update_preimage'")
    
    • Create the upsert function which merge the change
    • write stream to write the info
    event_read_stream.writeStream.format("delta")
      .trigger(processingTime = "2 seconds") # if in case if job use once
      .outputMode("append")
      .start()