Search code examples
apache-sparkazure-data-lakedelta-lake

Apache Spark/Azure Data Lake Storage - Process the file exactly once, tag the file as processed


I have an Azure Data Lake Storage container which acts as a landing area for JSON files to process by Apache Spark.

There are tens of thousands of small (up to a few MB) files there. The Spark code reads these files on a regular basis and performs some transfomations.

I want the files to be read exactly once and the Spark script to be idempotent. How do I ensure that the files are not read again and again? How do I do it in an efficient manner?

I read the data this way:

spark.read.json("/mnt/input_location/*.json")

I thought about the following approaches:

  1. Create a Delta table with the file names that have already been processed and run the EXCEPT transformation on the input DataFrame
  2. Move the processed files to a different location (or rename them). I would rather not do that. In case I need to reprocess the data, I need to run the rename once again this operation takes a long time.

I hope there is a better way. Please suggest something.


Solution

  • You can use a Structured Streaming job with checkpointing enabled and a Trigger.Once.

    The checkpoint files of that job will keep track of the JSON files that were already consumed by the job. In addition, the Trigger.Once trigger will make this streaming job as if it was a batch job.

    There is a nice article from Databricks which explain "Why Streaming and RunOnce is Better than Batch".

    Your structured streaming job could look like below:

    val checkpointLocation = "/path/to/checkpoints"
    val pathToJsonFiles = "/mnt/input_location/"
    val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)
    
    val query = streamDF
      .[...] // apply your processing
      .writeStream
      .format("console") // change sink format accordingly
      .option("checkpointLocation", checkpointLocation)
      .trigger(Trigger.Once)
      .start()
    
    query.awaitTermination()