Search code examples
jsonscalaapache-sparkserialization

An exception occurs when spark converts a json string to a HashMap in spark


there is no problem in the local environment, but a exception occur when performing spark submit.

The approximate code is as follows

class Test extends Serializable {
     def action() = {
         val sc = SparkContext.getOrCreate(sparkConf)
         val rdd1 = sc.textFile(.. )
         val rdd2 = rdd1.map ( logline => {
             //gson
             val jsonObject  jsonParser.parse(logLine).getAsJsonObject
             //jackson
             val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
             MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)              
         }                                  
     }
}

Exception

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
               ensureSerializable(ClosureCleanser. scala:444)..
               ........
               ........
               caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier

I have used both gson and jackson libraries.

Isn't this a problem that can be solved just by inheriting from serializable ?


Solution

  • The exception NotSerializableException is pretty self explanatory. Your task is not serializable. Spark is a parallel computing engine. The driver (where your main program is executed) ships the transformations you want to make on the RDD (the code written inside map functions) to executors where they are executed. Therefore those transformations need to be serializable. In your case, jsonParser and objectMapper are created on the driver. To use them inside a transformation, spark tries to serialize them and fails because they are not serializable. That's your error. I don't know which one is not serializable, maybe both.

    Let's take an example and see how we can fix it.

    // let's create a non serializable class
    class Stuff(val i : Int) {
        def get() = i
    }
    // we instantiate it in the driver
    val stuff = new Stuff(4)
    
    //this fails "Caused by: java.io.NotSerializableException: Stuff"
    val result = sc.parallelize(Seq(1, 2,3)).map( x => (x, stuff.get)).collect
    

    To fix it let's create the object inside the transformation

    val result = sc.parallelize(Seq(1, 2,3))
        .map( x => {
            val new_stuff = new Stuff(4)
            (x, new_stuff.get)
        }).collect
    

    It works, but obviously, creating the object for every record can be quite expensive. We can do better with mapPartition and create the object only once per partition:

    val result = sc.parallelize(Seq(1, 2,3))
        .mapPartitions(part => {
             val new_stuff = new Stuff(4)
             part.map( x => (x, new_stuff.get))
        }).collect