I have an Azure Data Lake Storage container which acts as a landing area for JSON files to process by Apache Spark.
There are tens of thousands of small (up to a few MB) files there. The Spark code reads these files on a regular basis and performs some transfomations.
I want the files to be read exactly once and the Spark script to be idempotent. How do I ensure that the files are not read again and again? How do I do it in an efficient manner?
I read the data this way:
spark.read.json("/mnt/input_location/*.json")
I thought about the following approaches:
I hope there is a better way. Please suggest something.
You can use a Structured Streaming job with checkpointing enabled and a Trigger.Once
.
The checkpoint files of that job will keep track of the JSON files that were already consumed by the job. In addition, the Trigger.Once
trigger will make this streaming job as if it was a batch job.
There is a nice article from Databricks which explain "Why Streaming and RunOnce is Better than Batch".
Your structured streaming job could look like below:
val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)
val query = streamDF
.[...] // apply your processing
.writeStream
.format("console") // change sink format accordingly
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Once)
.start()
query.awaitTermination()