Search code examples
scalaapache-sparkscalatest

Error with mvn test: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext


I am getting the following error only when running all the test cases using mvn test. This doesn't happen if I run each of the test classes independently from IDE. I am using https://github.com/MrPowers/spark-fast-tests framework.

Trait

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local")
      .appName("spark-fast-tests test session")
      .config("spark.sql.shuffle.partitions", "1")
      .getOrCreate()
  }
}

Tests

WordCountDSAppTestSpec

class WordCountDSAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

  "toWords" should "split the file into words" in {
    val sourceDf = Seq(
      ("one"),
      ("two"),
      (""),
      ("three Three")
    ).toDF("line").as[Line]

    val expectedDF = Seq(
      ("one", "one"),
      ("two", "two"),
      ("three Three", "three"),
      ("three Three", "Three"),
      ("", "")
    ).toDF("line", "word").as[LineAndWord]

    val actualDF = WordCountDSApp.toWords(sourceDf)

    assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
  }

  "countWords" should "return count of each word" in {

    val wordsDF = Seq(
      ("one", "one"),
      ("two", "two"),
      ("three Three", "three"),
      ("three Three", "Three"),
      ("", "")
    ).toDF("line", "word").as[LineAndWord]

    val tupleEncoder = Encoders.tuple(Encoders.STRING, Encoders.LONG)
    val expectedDF = Seq(
      ("one", 1L),
      ("two", 1L),
      ("three", 2L)
    ).toDF("value", "count(1)").as[(String, Long)]

    val actualDF = WordCountDSApp.countWords(wordsDF)

    assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
  }
}

ProductSalesAppTestSpec

class ProductSalesAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

  val productCols = Seq("product_id", "product_name", "price")
  val productDF = Seq(
    ("0", "product_0", "22"),
    ("1", "product_1", "30"),
    ("2", "product_2", "91")
  ).toDF(productCols: _*)
  productDF.createOrReplaceTempView("PRODUCTS")

  val orderCols = Seq("order_id", "product_id", "seller_id", "date", "num_pieces_sold", "bill_raw_text")

  "howManyProductsHaveBeenSoldAtLeastOnce" should "return zero when no orders are made for products" in {
    val orderDF = Seq(
      ("1", "10", "0", "2020-07-10", "26", "kyeibuumwlyhuwksx"),
      ("2", "20", "0", "2020-07-08", "13", "kyeibuumwlyhuwksx"),
      ("3", "30", "0", "2020-07-05", "38", "kyeibuumwlyhuwksx"),
      ("4", "40", "0", "2020-07-05", "56", "kyeibuumwlyhuwksx")
    ).toDF(orderCols: _*)
    orderDF.createOrReplaceTempView("ORDERS")

    assert(ProductSalesApp.howManyProductsHaveBeenAoldAtLeastOnce(spark) == 0)
  }
}

Error

- should return zero when no orders are made for products *** FAILED ***
  org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
  at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
  at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
  ...
  Cause: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
com.aravind.oss.SparkSessionTestWrapper$class.spark(SparkSessionTestWrapper.scala:16)
com.aravind.oss.eg.spark.sales.ProductSalesAppTestSpec.spark$lzycompute(ProductSalesAppTestSpec.scala:13)
com.aravind.oss.eg.spark.sales.ProductSalesAppTestSpec.spark(ProductSalesAppTestSpec.scala:13)
com.aravind.oss.eg.spark.sales.ProductSalesAppTestSpec.<init>(ProductSalesAppTestSpec.scala:18)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:422)
java.lang.Class.newInstance(Class.java:442)
org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66)
org.scalatest.tools.DiscoverySuite$$anonfun$1.apply(DiscoverySuite.scala:38)
org.scalatest.tools.DiscoverySuite$$anonfun$1.apply(DiscoverySuite.scala:37)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.Iterator$class.foreach(Iterator.scala:891)
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

The currently active SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
com.aravind.oss.SparkSessionTestWrapper$class.spark(SparkSessionTestWrapper.scala:16)
com.aravind.oss.eg.spark.wordcount.WordCountDSAppTestSpec.spark$lzycompute(WordCountDSAppTestSpec.scala:15)
com.aravind.oss.eg.spark.wordcount.WordCountDSAppTestSpec.spark(WordCountDSAppTestSpec.scala:15)
com.aravind.oss.eg.spark.wordcount.WordCountDSAppTestSpec$$anonfun$1.apply$mcV$sp(WordCountDSAppTestSpec.scala:20)
com.aravind.oss.eg.spark.wordcount.WordCountDSAppTestSpec$$anonfun$1.apply(WordCountDSAppTestSpec.scala:19)
com.aravind.oss.eg.spark.wordcount.WordCountDSAppTestSpec$$anonfun$1.apply(WordCountDSAppTestSpec.scala:19)
org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
org.scalatest.Transformer.apply(Transformer.scala:22)
org.scalatest.Transformer.apply(Transformer.scala:20)
org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685)
  at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
  at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2359)
  at org.apache.spark.sql.execution.LocalTableScanExec.numParallelism$lzycompute(LocalTableScanExec.scala:49)
  at org.apache.spark.sql.execution.LocalTableScanExec.numParallelism(LocalTableScanExec.scala:48)
  at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:51)
  at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:51)
  at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:55)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  ...

Solution

  • I checked your git repo

    after entering in to this test case

      "howManyProductsHaveBeenSoldAtLeastOnce" should "return zero when no orders are made for products" in {
    
    

    It is resetting(stopping the spark context ) your spark session by using new thread via mvn test when you are using spark session from trait.

    where as from Itellij IDE its in the same thread and its not resetting spark (not stopping the context)

    I was able to investigate in mvn debug mode (mvn -DforkMode=never test -f pom.xml) and with the usage of spark.sparkContext.isStopped as shown in the below code.

    Thats the reason its working from and not via mvn test


    So I left the idea of creating trait for spark session and created a local spark session (may be you can try for import org.scalatest.{BeforeAndAfterAll}) for better code.

    package com.aravind.oss.eg.spark.wordcount
    
    import com.aravind.oss.SparkSessionTestWrapper
    import com.aravind.oss.eg.spark.sales.ProductSalesApp
    import com.github.mrpowers.spark.fast.tests.DatasetComparer
    import org.scalatest.{BeforeAndAfterAll, FlatSpec}
    import ProductSalesApp._
    import org.apache.spark.sql.SparkSession
    class ProductSalesAppTestSpec extends FlatSpec   with DatasetComparer {
    
    
       lazy  implicit val spark: SparkSession =
        SparkSession
          .builder()
          .master("local")
          .appName("spark-fast-tests test session")
          .config("spark.sql.shuffle.partitions", "1")
          .getOrCreate()
    
      import spark.implicits._
      val productCols = Seq("product_id", "product_name", "price")
      val productDF = Seq(
        ("0", "product_0", "22"),
        ("1", "product_1", "30"),
        ("2", "product_2", "91")
      ).toDF(productCols: _*)
      productDF.createOrReplaceTempView("PRODUCTS")
    
      val orderCols = Seq("order_id", "product_id", "seller_id", "date", "num_pieces_sold", "bill_raw_text")
      val orderDF = Seq(
        ("1", "10", "0", "2020-07-10", "26", "kyeibuumwlyhuwksx"),
        ("2", "20", "0", "2020-07-08", "13", "kyeibuumwlyhuwksx"),
        ("3", "30", "0", "2020-07-05", "38", "kyeibuumwlyhuwksx"),
        ("4", "40", "0", "2020-07-05", "56", "kyeibuumwlyhuwksx")
      ).toDF(orderCols: _*)
      orderDF.createOrReplaceTempView("ORDERS")
      orderDF.show
      println(  spark.sparkContext.isStopped)
      println(  spark.sparkContext.isLocal)
      "howManyProductsHaveBeenSoldAtLeastOnce" should "return zero when no orders are made for products" in {
    
        println( "howManyProductsHaveBeenSoldAtLeastOnce "+ spark.sparkContext.isStopped)
        println(  spark.sparkContext.isLocal)
        println(ProductSalesApp.howManyProductsHaveBeenAoldAtLeastOnce(spark))
        assert(ProductSalesApp.howManyProductsHaveBeenAoldAtLeastOnce(spark) == 0)
    
      }
    }
    
    

    Result :

    +--------+----------+---------+----------+---------------+-----------------+
    |order_id|product_id|seller_id|      date|num_pieces_sold|    bill_raw_text|
    +--------+----------+---------+----------+---------------+-----------------+
    |       1|        10|        0|2020-07-10|             26|kyeibuumwlyhuwksx|
    |       2|        20|        0|2020-07-08|             13|kyeibuumwlyhuwksx|
    |       3|        30|        0|2020-07-05|             38|kyeibuumwlyhuwksx|
    |       4|        40|        0|2020-07-05|             56|kyeibuumwlyhuwksx|
    +--------+----------+---------+----------+---------------+-----------------+
    
    false
    true
    howManyProductsHaveBeenSoldAtLeastOnce false
    true
    0