Search code examples
serializationapache-sparkjmstemplate

Task Not Serializable Exception - When using JMSTemplate in Spark foreach


I am trying to use Spring JMSTemplate class inside rdd.foreach method, but i am getting Task Not Serializable error. When i try with static variable it worked on local but in cluster, i am getting null pointer exception.

Sample Code:

inputRDD.foreach(record -> {

                  messageServices.send(record);   
}

Error log:

org.apache.spark.SparkException: Task not serializable
       at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
       at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
       at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
       at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
       at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
       at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
       at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
       at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:327)
       at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:47)
       at com.messenger.MessengerDriver.runJob(MessengerDriver.java:108)
       at com.messenger.MessengerDriver.main(MessengerDriver.java:60)
Caused by: java.io.NotSerializableException: org.springframework.jms.core.JmsTemplate
Serialization stack:
       - object not serializable (class: org.springframework.jms.core.JmsTemplate, value: org.springframework.jms.core.JmsTemplate@3b98b809)
       - field (class: com.messenger.Messenger.activemq.MessageProducer, name: jmsTemplate, type: class org.springframework.jms.core.JmsTemplate)
       - object (class com.messenger.Messenger.activemq.MessageProducer, com.messenger.Messenger.activemq.MessageProducer@662e682a)
       - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
       - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction)
       - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>)
       at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
       at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
       at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
       at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
       ... 13 more

Does anyone faced the same kind of issue?


Solution

  • The proper pattern is to use repartition & mapPartitions.
    repartition is to map the RDD to suitable size partition;
    mapPartitions is to handlle each partition seperately, you can create JMSTemplate for each partition inside the passing function.