Search code examples
apache-sparkcachingapache-spark-sqlapache-spark-datasetspark-avro

FileNotFoundException: Spark save fails. Cannot clear cache from Dataset[T] avro


I get the following error when saving a dataframe in avro for a second time. If I delete sub_folder/part-00000-XXX-c000.avro after saving, and then try to save the same dataset, I get the following:

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  • If I delete not only from sub_folder, but also from main_folder, then the problem doesn't happen, but I can't afford that.
  • The problem actually doesnt happen when trying to save the dataset in any other format.
  • Saving an empty dataset does not cause an error.

The example suggests that the tables need to be refreshed, but as the output of sparkSession.catalog.listTables().show() there are no tables to refresh.

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

The previously saved dataframe looks like this. The application is supposed to update it:

+--------------------+--------------------+
|              Col1  |               Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

For me this is a clear cache problem. However, all attemps of clearing the cache have failed:

 dataset.write
      .format("avro")
      .option("path", path)
      .mode(SaveMode.Overwrite) // Any save mode gives the same error
      .save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()  

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist() 

And this is how I read the dataset:

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

    val df = sparkSession.read
      .format("avro")
      .load(path)
      .select("*")

    df.as[T]
  }

Finally the stack trace is this one. Thanks a lot for your help!:

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 10 more

Solution

  • *Reading from the same location and writing in to same location will give this issue. it was also discussed in this forum. along with my answer there *

    and the below message in the error will mis lead. but actual issue is read/write from/in the same location.

    You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL
    

    I am giving another example other than yours (used parquet in your case avro).

    I have 2 options for you.

    Option 1 (cache and show will work like below...) :

    import org.apache.spark.sql.functions._
      val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")
    
      df.show(false)
    
      df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
      val df1 = spark.read.format("parquet").load(".../temp") // read back again
    
     val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.
    
      //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.
    
      df2.cache // cache to avoid FileNotFoundException
      df2.show(2, false) // light action to avoid FileNotFoundException
       // or println(df2.count) // action
    
       df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
      println("Rod saved in same directory where he read it from final records he saved after clean up are  ")
      df2.show(false)
    

    Option 2 :

    1) save the DataFrame with a different avro folder.

    2) Delete the old avro folder.

    3) Finally rename this newly created avro folder to the old name, will work.