Search code examples
apache-sparkpysparkamazon-emr

AWS EMR Spark job restarts [AsyncEventQueue: Dropping event from queue appStatus.]


My pyspark job (2 hours, processing 20 GB, writing 40MB) restarts the job even that after a successful run (logs) and the data was written to s3. I tried pyspark 2.3.0, 2.3.1 and emr-5.14.0, emr-5.16.0.

The traceback:

18/08/22 17:45:13 ERROR AsyncEventQueue: Dropping event from queue appStatus. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
18/08/22 17:45:13 WARN AsyncEventQueue: Dropped 1 events from appStatus since Thu Jan 01 00:00:00 UTC 1970.
18/08/22 17:46:28 WARN AsyncEventQueue: Dropped 25523 events from appStatus since Wed Aug 22 17:45:13 UTC 2018.
18/08/22 17:47:28 WARN AsyncEventQueue: Dropped 3417 events from appStatus since Wed Aug 22 17:46:28 UTC 2018.
18/08/22 17:48:28 WARN AsyncEventQueue: Dropped 3669 events from appStatus since Wed Aug 22 17:47:28 UTC 2018.
18/08/22 17:49:28 WARN AsyncEventQueue: Dropped 7725 events from appStatus since Wed Aug 22 17:48:28 UTC 2018.
18/08/22 17:50:28 WARN AsyncEventQueue: Dropped 6609 events from appStatus since Wed Aug 22 17:49:28 UTC 2018.
18/08/22 17:53:44 WARN AsyncEventQueue: Dropped 2272 events from appStatus since Wed Aug 22 17:50:28 UTC 2018.
18/08/22 17:54:39 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)
18/08/22 17:54:39 ERROR Utils: Uncaught exception in thread pool-4-thread-1
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1252)
    at java.lang.Thread.join(Thread.java:1326)
    at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
    at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)
    at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:219)
    at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1922)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1360)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1921)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:573)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • Found the answer here [1].

    tldr;

    To resolve this issue, explicitly invoke sparkContext.stop() before exiting the application.

    [1] https://community.hortonworks.com/content/supportkb/208452/warn-shutdownhookmanager-shutdownhook-anon2-timeou.html