I have a "Task not serializable" Exception when I run a Spark Scala program with
my code is something like this
object Main{
def main(args : Array(String){
...
var rdd = sc.textFile(filename)
.map(line => new NotSerializableJClass(line)).cache()
//rdd is RDD[NotSerializableJClass]
...
var test = new NotSerializableJPredicate()
rdd = rdd.filter(elem => test.test(elem))
//throws TaskNotSerializable on test Predicate class
}
}
I notice that i can resolve the second part with
rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))
but I still get that Exception for the class of the objects in RDDs. And I would in another way also the second part in another way just because I don't want to create a great number of PredicateClass's object.
Can you help me? How can I go forward with non-serializable class?
RDDs must be serializable so you cannot create an RDD of a non serializable class.
For your predicate, you could write it using mapPartitions.
rdd.mapPartitions{
part =>
val test = new NotSerializableJPredicate()
part.filter{elem => test.test(elem)}
}
mapPartitons will run once per partition, so it allows you to instantiate non-serializable classes on the executor, but it only needs to do it once per partition rather than for every record.