I am getting an error "Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail" when trying to close the Sequence file writer. The full log of exception is below:
16/12/30 19:47:01 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0001 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:12 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0002 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:23 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0003 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:35 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0004 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:46 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0005 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:57 ERROR s3n.MultipartUploadOutputStream: Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail 16/12/30 19:47:57 INFO s3n.MultipartUploadOutputStream: completeMultipartUpload error for key: output/part-20176 java.lang.IllegalStateException: Reached max limit of upload attempts for part at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.spawnNewFutureIfNeeded(MultipartUploadOutputStream.java:362) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadMultiParts(MultipartUploadOutputStream.java:422) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:471) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1290) ... at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/12/30 19:47:59 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 16/12/30 19:48:18 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException:
I just get the error that the 5 retries failed. I do not get the cause for that. Has anyone seen this error before? What could be the reason for this?
I am writing the sequence file using my own implementation of multi-output format:
class MultiOutputSequenceFileWriter(prefix: String, suffix: String) extends Serializable {
private val writers = collection.mutable.Map[String, SequenceFile.Writer]()
/**
* @param pathKey folder within prefix where the content will be written
* @param valueKey key of the data to be written
* @param valueValue value of the data to be written
*/
def write(pathKey: String, valueKey: Any, valueValue: Any) = {
if (!writers.contains(pathKey)) {
val path = new Path(prefix + "/" + pathKey + "/" + "part-" + suffix)
val hadoopConf = new conf.Configuration()
hadoopConf.setEnum("io.seqfile.compression.type", SequenceFile.CompressionType.NONE)
val fs = FileSystem.get(hadoopConf)
writers(pathKey) = SequenceFile.createWriter(hadoopConf, Writer.file(path),
Writer.keyClass(valueKey.getClass()),
Writer.valueClass(valueValue.getClass()),
Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)), //4KB
Writer.replication(fs.getDefaultReplication()),
Writer.blockSize(1073741824), // 1GB
Writer.progressable(null),
Writer.metadata(new Metadata()))
}
writers(pathKey).append(valueKey, valueValue)
}
def close = writers.values.foreach(_.close())
}
I am trying to write the sequence file as follows:
...
rdd.mapPartitionsWithIndex { (p, it) => {
val writer = new MultiOutputSequenceFileWriter("s3://bucket/output/", p.toString)
for ( (key1, key2, data) <- it) {
...
writer.write(key1, key2, data)
...
}
writer.close
Nil.iterator
}.foreach( (x:Nothing) => ()) // To trigger iterator
}
...
Note:
AWS support engineer mentioned that at the time of the error, there were lots of hits on the bucket. The job was retrying the default number of times (5) and most probably all the retries were throttled. Now, I have increased the number of retries with the following config param added while submitting job.
spark.hadoop.fs.s3.maxRetries=20
Additionally, I have compressed the output so that the number of requests to S3 would be reduced. I have not seen failures for several runs after these changes.