Search code examples
scalaapache-sparkserializationrdd

Scala Test SparkException: Task not serializable


I'm new to Scala and Spark. Wrote a simple test class and stuck on this issue for the whole day. Please find the below code

A.scala

class A(key :String) extends  Serializable {
     val this.key:String=key

     def getKey(): String = { return this.key}
}

B.Scala

class B(key :String) extends  Serializable {
     val this.key:String=key
     def getKey(): String = { return this.key}
 }

Test.scala

import com.holdenkarau.spark.testing.{RDDComparisons, SharedSparkContext}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter

class Test extends FunSuite with SharedSparkContext with RDDComparisons with BeforeAndAfter with Serializable {

  //comment this
  private[this] val b1 = new B("test1")

  test("Test RDD") {

    val a1 = new A("test1")
    val a2 = new A("test2")

    val expected= sc.parallelize(Seq(a1,a2))
    println(b1.getKey())
     //val b1 = new B("test1")
    //val key1 :String = b1.getKey()
    expected.foreach{ a =>
      //if(a.getKey().equalsIgnoreCase(key1))
        if(a.getKey().equalsIgnoreCase(b1.getKey()))
          print("hi")
    }
  }
}

This code is throwing exception

Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply$mcV$sp(Test.scala:19)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
    at com.adgear.adata.hhid.Test.org$scalatest$BeforeAndAfterAll$$super$run(Test.scala:7)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
    at com.adgear.adata.hhid.Test.run(Test.scala:7)

When I comment out the class level declaration of b1 and use the declaration inside the test methods itself then "if(a.getKey().equalsIgnoreCase(b1.getKey()))" this works. And if I retain class level b1 definition then "if(a.getKey().equalsIgnoreCase(b1.getKey()))" throws above exception. To solve this, I have to use "//val key1 :String = b1.getKey()" and "//if(a.getKey().equalsIgnoreCase(key1))" then it works.

As one can see A, B, and Test all implements Serializable still I get this exception. What is causing this issue?

Thanks


Solution

  • Declaring a class as Serializable doesn't mean that it can be serialized unless all of its field are Serializable as well.

    Since your Test class extends Funsuite, it will have an "assertionsHelper" field which is not Serializable. So when you reference the "b1" field in your "forEach" method, Spark will try to serialize the Test instance along with all its field (including the assertionsHelper).

    If you want to avoid this, you'll have to either define b1 somwhere else (in the test method scope or a companion object), or dereference b1 into a new variable before including it in the forEach function:

    val b1_ref = b1
    expected.foreach { a =>
      if (a.getKey().equalsIgnoreCase(b1_ref.getKey()))
        print("hi")
    }
    

    PS: When you encounter a serialization exception you usually have access to the "serialization stack" in the logs which tell you exactly which object caused the error