Search code examples
scalaapache-spark

scala object/singleton vals not getting initialized the same when using the spark-shell


I'm looking for some help with how object/singleton vals are initialized when using the spark-shell.

Test code:

import org.apache.spark.sql.SparkSession

object Consts {
    val f2 = 2
}
object Test extends Serializable {
    val f1 = 1
    println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
    def doWorkWithF1(x: Int)  = {
        f1
    }
    def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => f1)
    }
    def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => Consts.f2)
    }
    def main(args: Array[String]) {
        println(s"-- main starting f1=${f1} f2=${Consts.f2}")
        val spark = SparkSession.builder().getOrCreate()
        val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
        rdd.map(doWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
    }
}

Running:

$ spark-shell --master local[4]
scala> :paste "test.scala"
...
defined object Consts
defined object Test

scala> Test.main(Array())
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
11110000
23/02/22 21:03:31 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 9)
java.lang.NullPointerException
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply$mcII$sp(test.scala:37)
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
...
  • doWorkWithF1 (using map) works as I expect. The output is 1111.
  • In doPartitionWorkWithF1, the output is not what I expect. It is 0000. Why is val f1 set to 0 and not 1?
    Asked another way: when are integer vals in object singletons only initialized to 0?
  • In doPartitionWorkWithF2, I assume the null pointer exception is because f2 is null. Why is that?
    Asked another way: when are vals in object singletons only initialized to null?

Changing line 6 to add lazy

    lazy val f1 = 1

makes doPartitionWorkWithF1 work as I desire (expected)--i.e., 1111 is the result in the spark-shell.

And this is where spark gets frustrating to work with: if the original version (without lazy) is compiled and run using spark-submit I get the desired/expected result:

$ /usr/bin/spark-submit --master local[4] --driver-memory 1024m --name "TEST" --class Test test.jar 2> err
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
111111112222

I really don't like it when I have to write code differently to work in the spark-shell. But since the shell is so convenient, I do it. These kinds of nuances cost me a lot of time and effort though. The above is the salient parts of a 2000-line program that took me hours to figure out where in the code the shell was doing something different than the compiled version.


Solution

  • In spark-shell somehow it tries to serialize objects instead of sending their class definitions into executors.

    In your case your Consts object is not serializable, therefore it sends an empty object into executors then values becomes null.

    I couldn't find a proper workaround for Spark 2.x but below fix works fine with Spark 3:

    test.scala

    import org.apache.spark.sql.SparkSession
    
    @SerialVersionUID(123L)
    object Consts extends Serializable {
        val f2 = 2
    }
    object Test extends Serializable {
        val f1 = 1
        println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
        def doWorkWithF1(x: Int)  = {
            f1
        }
        def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int])  = {
            iter.map(x => f1)
        }
        def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int])  = {
            iter.map(x => Consts.f2)
        }
        def main(args: Array[String]) {
            println(s"-- main starting f1=${f1} f2=${Consts.f2}")
            val spark = SparkSession.builder().getOrCreate()
            val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
            rdd.map(doWorkWithF1).foreach(print)
            rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
            rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
        }
    }
    

    Terminal output:

    ➜  ~ ~/Downloads/spark-3.3.2-bin-hadoop3/bin/spark-shell --master "local[4]"
    23/03/02 03:16:24 WARN Utils: Your hostname, my.local resolves to a loopback address: 127.0.0.1; using 172.17.4.232 instead (on interface en0)
    23/03/02 03:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/03/02 03:16:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://172.17.4.232:4040
    Spark context available as 'sc' (master = local[*], app id = local-1677726988083).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    
    Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> :paste "test.scala"
    Pasting file test.scala...
    import org.apache.spark.sql.SparkSession
    defined object Consts
    defined object Test
    
    scala> Test.main(Array())
    -- init Test singleton f1=1 f2=2
    -- main starting f1=1 f2=2
    11112222