A pipeline runs every 20 minutes pushing data to ADLS Gen2 storage in ORC format.
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
forEachBatch()
from writestream with checkpoint
options enabled.
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
Question:
In above point #2, instead of using the readStream (reading from orc file), create a new readStream using the Delta table path like below
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
I came across the link DATA+AI summit which has Demo for such scenario.
In my case, each batch has >90% new row, less updates. So i can't use this option. This might help others.
Below is similar to Alex Ott answer, have added additional info
Per recommendation, if batch updates are more, CDF might not be effective.
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
table_changes()
function to view the changes%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()