Search code examples
scalaapache-spark-mlcheckpoint

[Randomly appear][Spark ML ALS][AWS EMR] FileNotFoundException in checkpoint folder but file exists


I am running a scheduled(once every day) spark application on AWS EMR, the application is a recommender algorithm base on spark.ml.recommendation.ALS, the data is located on AWS S3, the application outputs recommendations to a group of user.

In order to ensure the iteration algorithm to run robustly, I used spark's checkpoint function. I set the checkpoint folder on AWS S3.

Sometimes everything works fine. But sometimes, the spark application failed to find the file in the checkpoint folder, even when the file actually exists. I don't know why.

here is a typical error log:

19/10/30 13:46:01 WARN TaskSetManager: Lost task 5.0 in stage 873.0 (TID 12169, ip-10-79-9-182.us-west-2.compute.internal, executor 5): java.io.FileNotFoundException: No such file or directory: s3a://bucket-name/checkpoint/8f63442c-dd06-45d8-8e3a-ec30634b1a2f/rdd-2166/part-00005 at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521) at org.apache.spark.rdd.ReliableCheckpointRDD$.readCheckpointFile(ReliableCheckpointRDD.scala:292) at org.apache.spark.rdd.ReliableCheckpointRDD.compute(ReliableCheckpointRDD.scala:100) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

I checked that s3a://bucket-name/checkpoint/8f63442c-dd06-45d8-8e3a-ec30634b1a2f/rdd-2166/part-00005 does exists on the S3 storage.

my detailed steps are follows:

  1. create a checkpoint folder on s3;
  2. set spark's CheckpointDir to the folder just created;
  3. run algorithm;
  4. delete the checkpoint folder for clean up.

here is my scala codes:

//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)

//step 2
sparkContext.setCheckpointDir(pathString)

//step 3
//... lots of data that not so relevant
val als = new ALS()
      .setRank(10)
      .setMaxIter(20)
      .setUserCol("userId")
      .setItemCol("clubId")
      .setRatingCol("rating")
      .setCheckpointInterval(10)
      .setColdStartStrategy("drop")
      .setPredictionCol("prediction")
//... another lots of data that not so relevant

//step 4
fileSystem.delete(path, recursive = true)

Solution

  • S3 is eventually consistent -sometimes 404s can get cached in the load balancers if a client does a HEAD before file is created -and then on subsequent HEAD/GET requests that 404 is (a) returned (b) the cache entry refreshed so it stays around

    The S3A connector has had a lot of work recently to try and eliminate this problem HADOOP-16490 & related), but that's not shipping yet. While it does a lot to eliminate the problem in the s3a connector, it still may be vulnerable to quirks in the spark's use of the code. Someone should review the checkpointing to make sure it creates files with overwrite=true.

    Meanwhile: if you use the hadoop 3.2.x binaries and use S3Guard for consistent listings, it should know enough to retry here -you just might need to tune in the retry interval to be a bit bigger so that the URL is left alone at long enough for the cache to clear.

    otherwise, try adding some 30-60s sleeps in your workflow between creating files and trying to rename or open then, to see if that helps.