Search code examples
scalaapache-sparksbtscalatestapache-spark-standalone

Unit Tests using Spark Session : SparkContext was shut down


We have a big project with multiple tests suites and every test suite has in average 3 tests.

For our unit tests, we use the Spark Standalone, and so no Yarn as a resource manager. Every test suite :

initiliazes spark session :

  implicit val spark = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()

extends BeforeAndAfterAll:

class MyTestsSpec extends WordSpec
  with Matchers
  with BeforeAndAfterAll {
...
}

and redefine afterAll :

  override def afterAll: Unit = {
    try {
      spark.stop()
    } finally {
      super.afterAll
    }
  }

Our solution has an CI job in Jenkins, and the Jenkins job started to be so often Unstable due to tests that fail because of the following Error :

Message d'erreur
Job 9 cancelled because SparkContext was shut down
Pile d'exécution
org.apache.spark.SparkException: Job 9 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)
    at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
// some business classes
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
// some business classes
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
// some business classes
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
    at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
    at sbt.TestRunner.runTest$1(TestFramework.scala:106)
    at sbt.TestRunner.run(TestFramework.scala:117)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:262)
    at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:233)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)
    at sbt.TestFunction.apply(TestFramework.scala:271)
    at sbt.Tests$.processRunnable$1(Tests.scala:307)
    at sbt.Tests$.$anonfun$makeSerial$1(Tests.scala:313)
    at sbt.std.Transform$$anon$3.$anonfun$apply$2(System.scala:46)
    at sbt.std.Transform$$anon$4.work(System.scala:66)
    at sbt.Execute.$anonfun$submit$2(Execute.scala:262)
    at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
    at sbt.Execute.work(Execute.scala:271)
    at sbt.Execute.$anonfun$submit$1(Execute.scala:262)
    at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:174)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:36)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

And when we run the test alone it succeeds, with no problem.


Solution

  • I had a similar issue.

    I had multiple tests using Spark and only the first suite worked. I was calling spark.close() in all of them. After removing this call from all suites it worked.

    After investigating SparkSession code, my conclusion was that since it is only possible to have one SparkContext per JVM and the tests are run on the same JVM, when you stop it the first time, it becomes unusable for that "JVM session".