Search code examples
javamultithreadingamazon-web-servicesapache-sparkelastic-map-reduce

Spark possible race condition in driver


I have a Spark job that processes several folders on S3 per run and stores its state on DynamoDB. In other words, we're running the job once per day, it looks for new folders added by another job, transforms them one-by-one and writes state to DynamoDB. Here's rough pseudocode:

object App {
  val allFolders = S3Folders.list()
  val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
  Transformer.run(foldersToProcess)
}

object Transformer {
  def run(folders: List[String]): Unit = {
    val sc = new SparkContext()
    folders.foreach(process(sc, _))
  }

  def process(sc: SparkContext, folder: String): Unit = ???  // transform and write to S3
}

This approach works well if S3Folders.list() returns relatively small amount of folders (up to few thousands), if it returns more (4-8K) very often we see following error (that in first glance has nothing to do with Spark):

17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
        at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
        at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
        at me.chuwy.transform.Main$.main(Main.scala:22)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.read1(BufferedReader.java:210)
        at java.io.BufferedReader.read(BufferedReader.java:286)
        at java.io.Reader.read(Reader.java:140)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
        ... 36 more

For big amount of folders (~20K) this happens all the time and job cannot start.

Previously we had very similar, but much more frequent error when getFoldersToProcess did GetItem for every folder from allFolders and therefore took much longer:

17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
        at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
        at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
        at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
        at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed$1.apply(ProcessManifest.scala:44)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
        at me.chuwy.transform.Main$.main(Main.scala:19)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

I believe that current error has nothing to do with XML parsing or invalid response, but originate from some race condition inside Spark, because:

  1. There's clear connection between amount of time "state-fetching" takes and chance of failure
  2. Tracebacks have underlying AbortedException, which AFAIK caused by swallowed InterruptedException, which can mean something inside JVM (spark-submit or even YARN) calls Thread.sleep for main thread.

Right now I'm using EMR AMI 5.5.0, Spark 2.1.0 and shaded AWS SDK 1.11.208, but had similar error with AWS SDK 1.10.75.

I'm deploying this job on EMR via command-runner.jar spark-submit --deploy-mode cluster --class ....

Does anyone have any idea where does this exception originate from and how to fix it?


Solution

  • The problem was that getFoldersToProcess is a blocking (and very long) operation, which prevents SparkContext from being instantiated. SpackContext itself should signal about own instantiation to YARN and if it doesn't help in a certain amount of time - YARN assumes that driver node has fallen off and kills the whole cluster.