df
.writeStream
.trigger(Trigger.Once)
.option(checkpointKey, checkpointVal)
.foreachBatch { (batchDF: DataFrame, batchId: Long) => }
This is the sample code I am running. Observing that Structured streaming creates the offsets file at the beginning itself: checkpoints/offsets/3
Why does it not wait for the foreachBatch to complete and then write the offsets to checkpoint directory?
Each micro batch is similar as one transaction.
When kafka source have new messages. The begin and end offsets for this microbatch will be writen to offsets folder.
Start processing.
If succeed, commit file will be writen into commits folder with same microbath id. If failed, will reexecute micro batch using the same offsets range.