there is no problem in the local environment, but a exception occur when performing spark submit.
The approximate code is as follows
class Test extends Serializable {
def action() = {
val sc = SparkContext.getOrCreate(sparkConf)
val rdd1 = sc.textFile(.. )
val rdd2 = rdd1.map ( logline => {
//gson
val jsonObject jsonParser.parse(logLine).getAsJsonObject
//jackson
val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)
}
}
}
Exception
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
ensureSerializable(ClosureCleanser. scala:444)..
........
........
caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier
I have used both gson and jackson libraries.
Isn't this a problem that can be solved just by inheriting from serializable ?
The exception NotSerializableException
is pretty self explanatory. Your task is not serializable. Spark is a parallel computing engine. The driver (where your main program is executed) ships the transformations you want to make on the RDD (the code written inside map
functions) to executors where they are executed. Therefore those transformations need to be serializable. In your case, jsonParser
and objectMapper
are created on the driver. To use them inside a transformation, spark tries to serialize them and fails because they are not serializable. That's your error. I don't know which one is not serializable, maybe both.
Let's take an example and see how we can fix it.
// let's create a non serializable class
class Stuff(val i : Int) {
def get() = i
}
// we instantiate it in the driver
val stuff = new Stuff(4)
//this fails "Caused by: java.io.NotSerializableException: Stuff"
val result = sc.parallelize(Seq(1, 2,3)).map( x => (x, stuff.get)).collect
To fix it let's create the object inside the transformation
val result = sc.parallelize(Seq(1, 2,3))
.map( x => {
val new_stuff = new Stuff(4)
(x, new_stuff.get)
}).collect
It works, but obviously, creating the object for every record can be quite expensive. We can do better with mapPartition
and create the object only once per partition:
val result = sc.parallelize(Seq(1, 2,3))
.mapPartitions(part => {
val new_stuff = new Stuff(4)
part.map( x => (x, new_stuff.get))
}).collect