I get the following error when saving a dataframe in avro for a second time. If I delete sub_folder/part-00000-XXX-c000.avro after saving, and then try to save the same dataset, I get the following:
FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
sub_folder
, but also from main_folder
, then the problem doesn't happen, but I can't afford that. The example suggests that the tables need to be refreshed, but as the output of sparkSession.catalog.listTables().show()
there are no tables to refresh.
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+
The previously saved dataframe looks like this. The application is supposed to update it:
+--------------------+--------------------+
| Col1 | Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+
For me this is a clear cache problem. However, all attemps of clearing the cache have failed:
dataset.write
.format("avro")
.option("path", path)
.mode(SaveMode.Overwrite) // Any save mode gives the same error
.save()
// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()
// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist()
And this is how I read the dataset:
private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {
val df = sparkSession.read
.format("avro")
.load(path)
.select("*")
df.as[T]
}
Finally the stack trace is this one. Thanks a lot for your help!:
ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
... 10 more
and the below message in the error will mis lead. but actual issue is read/write from/in the same location.
You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL
I am giving another example other than yours (used parquet in your case avro).
I have 2 options for you.
Option 1 (cache
and show
will work like below...) :
import org.apache.spark.sql.functions._
val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")
df.show(false)
df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
val df1 = spark.read.format("parquet").load(".../temp") // read back again
val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.
//BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.
df2.cache // cache to avoid FileNotFoundException
df2.show(2, false) // light action to avoid FileNotFoundException
// or println(df2.count) // action
df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
println("Rod saved in same directory where he read it from final records he saved after clean up are ")
df2.show(false)
Option 2 :
1) save the DataFrame with a different avro folder.
2) Delete the old avro folder.
3) Finally rename this newly created
avro
folder to the old name, will work.