I'm looking for some help with how object
/singleton val
s are initialized when using the spark-shell.
Test code:
import org.apache.spark.sql.SparkSession
object Consts {
val f2 = 2
}
object Test extends Serializable {
val f1 = 1
println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
def doWorkWithF1(x: Int) = {
f1
}
def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int]) = {
iter.map(x => f1)
}
def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int]) = {
iter.map(x => Consts.f2)
}
def main(args: Array[String]) {
println(s"-- main starting f1=${f1} f2=${Consts.f2}")
val spark = SparkSession.builder().getOrCreate()
val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
rdd.map(doWorkWithF1).foreach(print)
rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
}
}
Running:
$ spark-shell --master local[4]
scala> :paste "test.scala"
...
defined object Consts
defined object Test
scala> Test.main(Array())
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
11110000
23/02/22 21:03:31 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 9)
java.lang.NullPointerException
at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply$mcII$sp(test.scala:37)
at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
...
doWorkWithF1
(using map
) works as I expect. The output is 1111
.doPartitionWorkWithF1
, the output is not what I expect. It is 0000
. Why is val f1
set to 0 and not 1?
doPartitionWorkWithF2
, I assume the null pointer exception is because f2
is null
. Why is that?
null
?Changing line 6 to add lazy
lazy val f1 = 1
makes doPartitionWorkWithF1
work as I desire (expected)--i.e., 1111
is the result in the spark-shell.
And this is where spark gets frustrating to work with: if the original version (without lazy
) is compiled and run using spark-submit
I get the desired/expected result:
$ /usr/bin/spark-submit --master local[4] --driver-memory 1024m --name "TEST" --class Test test.jar 2> err
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
111111112222
I really don't like it when I have to write code differently to work in the spark-shell. But since the shell is so convenient, I do it. These kinds of nuances cost me a lot of time and effort though. The above is the salient parts of a 2000-line program that took me hours to figure out where in the code the shell was doing something different than the compiled version.
In spark-shell
somehow it tries to serialize objects instead of sending their class
definitions into executors.
In your case your Consts
object is not serializable, therefore it sends an empty object into executors then values becomes null
.
I couldn't find a proper workaround for Spark 2.x but below fix works fine with Spark 3:
test.scala
import org.apache.spark.sql.SparkSession
@SerialVersionUID(123L)
object Consts extends Serializable {
val f2 = 2
}
object Test extends Serializable {
val f1 = 1
println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
def doWorkWithF1(x: Int) = {
f1
}
def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int]) = {
iter.map(x => f1)
}
def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int]) = {
iter.map(x => Consts.f2)
}
def main(args: Array[String]) {
println(s"-- main starting f1=${f1} f2=${Consts.f2}")
val spark = SparkSession.builder().getOrCreate()
val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
rdd.map(doWorkWithF1).foreach(print)
rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
}
}
Terminal output:
➜ ~ ~/Downloads/spark-3.3.2-bin-hadoop3/bin/spark-shell --master "local[4]"
23/03/02 03:16:24 WARN Utils: Your hostname, my.local resolves to a loopback address: 127.0.0.1; using 172.17.4.232 instead (on interface en0)
23/03/02 03:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/02 03:16:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.17.4.232:4040
Spark context available as 'sc' (master = local[*], app id = local-1677726988083).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste "test.scala"
Pasting file test.scala...
import org.apache.spark.sql.SparkSession
defined object Consts
defined object Test
scala> Test.main(Array())
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
11112222