Search code examples
apache-sparkamazon-s3emr

Multipart upload error to S3 from Spark


I am getting an error "Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail" when trying to close the Sequence file writer. The full log of exception is below:

16/12/30 19:47:01 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0001 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:12 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0002 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:23 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0003 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:35 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0004 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:46 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0005 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:57 ERROR s3n.MultipartUploadOutputStream: Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail
16/12/30 19:47:57 INFO s3n.MultipartUploadOutputStream: completeMultipartUpload error for key: output/part-20176
java.lang.IllegalStateException: Reached max limit of upload attempts for part
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.spawnNewFutureIfNeeded(MultipartUploadOutputStream.java:362)
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadMultiParts(MultipartUploadOutputStream.java:422)
    at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:471)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
    at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1290)
   ...
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/12/30 19:47:59 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 
16/12/30 19:48:18 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 

I just get the error that the 5 retries failed. I do not get the cause for that. Has anyone seen this error before? What could be the reason for this?

I am writing the sequence file using my own implementation of multi-output format:

class MultiOutputSequenceFileWriter(prefix: String, suffix: String) extends Serializable {
   private val writers = collection.mutable.Map[String, SequenceFile.Writer]()

   /**
     * @param pathKey    folder within prefix where the content will be written
     * @param valueKey   key of the data to be written
     * @param valueValue value of the data to be written
     */
   def write(pathKey: String, valueKey: Any, valueValue: Any) = {
     if (!writers.contains(pathKey)) {
       val path = new Path(prefix + "/" + pathKey + "/" + "part-" + suffix)
       val hadoopConf = new conf.Configuration()
       hadoopConf.setEnum("io.seqfile.compression.type", SequenceFile.CompressionType.NONE)
       val fs = FileSystem.get(hadoopConf)
       writers(pathKey) = SequenceFile.createWriter(hadoopConf, Writer.file(path),
         Writer.keyClass(valueKey.getClass()),
         Writer.valueClass(valueValue.getClass()),
         Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)), //4KB
         Writer.replication(fs.getDefaultReplication()),
         Writer.blockSize(1073741824), // 1GB
         Writer.progressable(null),
         Writer.metadata(new Metadata()))
     }
     writers(pathKey).append(valueKey, valueValue)
   }
   def close = writers.values.foreach(_.close())
}

I am trying to write the sequence file as follows:

...
rdd.mapPartitionsWithIndex { (p, it) => {
  val writer = new MultiOutputSequenceFileWriter("s3://bucket/output/", p.toString)
  for ( (key1, key2, data) <- it) {
    ...
    writer.write(key1, key2, data)
    ...
  }
  writer.close
  Nil.iterator
}.foreach( (x:Nothing) => ()) // To trigger iterator
}
...

Note:

  • I am getting the exception when I am trying to close the writers (I think the writers try to write the content before closing and I think the exception is coming due to this).
  • I retried the same job with same input two more times. I got no error in the first re-run but got three errors in the second. Could it just be transient issue in S3?
  • The part file that failed is not there in S3.

Solution

  • AWS support engineer mentioned that at the time of the error, there were lots of hits on the bucket. The job was retrying the default number of times (5) and most probably all the retries were throttled. Now, I have increased the number of retries with the following config param added while submitting job.

    spark.hadoop.fs.s3.maxRetries=20

    Additionally, I have compressed the output so that the number of requests to S3 would be reduced. I have not seen failures for several runs after these changes.