Search code examples
javascalaapache-sparkrddserializable

Spark Scala Programming for not serializable objects and functions


I have a "Task not serializable" Exception when I run a Spark Scala program with

  • Spark RDDs is of not serializable type (java class)
  • called functions are from a not serializable class (java class, again)

my code is something like this

object Main{
    def main(args : Array(String){
        ...
        var rdd = sc.textFile(filename)
                  .map(line => new NotSerializableJClass(line)).cache() 
        //rdd is RDD[NotSerializableJClass]
        ...
        var test = new NotSerializableJPredicate()
        rdd = rdd.filter(elem => test.test(elem))
        //throws TaskNotSerializable on test Predicate class
    }
}

I notice that i can resolve the second part with

rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))

but I still get that Exception for the class of the objects in RDDs. And I would in another way also the second part in another way just because I don't want to create a great number of PredicateClass's object.

Can you help me? How can I go forward with non-serializable class?


Solution

  • RDDs must be serializable so you cannot create an RDD of a non serializable class.

    For your predicate, you could write it using mapPartitions.

    rdd.mapPartitions{
      part => 
        val test = new NotSerializableJPredicate()
        part.filter{elem => test.test(elem)}
       }
    

    mapPartitons will run once per partition, so it allows you to instantiate non-serializable classes on the executor, but it only needs to do it once per partition rather than for every record.