Search code examples
pysparkdatabricksspark-streamingazure-databricksdelta-lake

DeltaFileNotFoundException: No file found in the directory DataBricks


I would like to request you for your help.

I have been working with DataBricks. We developed some scrips and they are working in streaming. Let's suppose that we have two jobs running and writing data to one general local dataset. This means notebook1 and notebook2 writing data at the same LDS.

Each notebook read data from different origins and write the data to the same LDS in an standard format. To avoid problems we made use of partitions at the LDS.

This means that in this case the LDS have one partition for notebook1 and other partition for notebook2.

This implementation has been working well for almost 5 months.

However, today we just faced the following error:

com.databricks.sql.transaction.tahoe.DeltaFileNotFoundException: No file found in the directory: dbfs:/mnt/streaming/streaming1/_delta_log.

I have been looking for information for some way to solve it and the solutions that I found have been:

  1. Solution 1 Which explain some reasons why this situations could happen and they say we should use a new checkpoint directory, or set the Spark property spark.sql.files.ignoreMissingFiles to true in the cluster’s Spark Config. The first solution of using a new checkpoints directory is not possible for us to use due the requeriments that we need to satisfy because using a new checkpoints would mean for us to process the whole data again that has been processed. You may ask why? In a summary we get updates from a database that is saved in a delta table that contais the raw data and is where we consume the data, so using a new checkpoint or deleting it would mean for us consume the whole data. This only allow us to use the solution of applying the property of spark.sql.files.ignoreMissingFiles. However, my question here is: If we set this property, Would we be processing the data from the beginning? Or it would resume to process where the last checkpoints was?

  2. Solution 2 I found a similar case here, however I didn't understand it at all, what they suggest is to change the parent directory, however we do have something similar to that which could not satisfy our problem and also add the directory in the start() option?

We have our mains streaming like this:

spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("maxFilesPerTrigger", 250) \
  .option("maxBytesPerTrigger", 536870912)\
  .option("failOnDataLoss", "true")\
  .load(DATA_PATH)\
  .filter(expr("_change_type not in ('delete', 'update_preimage')"))\
  .writeStream\
  .queryName(streamQueryName)\
  .foreachBatch(MainFunctionstoprocess)\
  .option("checkpointLocation", checkpointLocation)\
  .option("mergeSchema", "true")\
  .trigger(processingTime='1 seconds')\
  .start()

Does anyone have some idea how we could solve this problem without deleting the checkpoints so we can resume the data in the last checkpoint it failed, or some way to get back to one checkpoint so we can only reprocess some part of the data?


Solution

  • We had the same issue because of some changes having been applied to the vacuuming thresholds over the workspace.

    Add spark.conf.set("spark.sql.files.ignoreMissingFiles", True) before executing the steaming command.

    ignoreMissingFiles will NOT process the data from the beginning, and it will use the current checkpoint directory. However, it will take a bit longer to process the data.

    After a few runs, and making sure that you don't have unplanned deletion or vacuuming operations over the delta table, you can remove the ignoreMissingFiles line.

    Edit..

    Checkpoint deletion doesn't always mean processing data from the beginning. If you deleted your checkpoint directory add a starting timestamp option to your readStream .option("startingTimestamp", '2023-12-01T00:00:00.000Z')\