I try to send the java String messages with kafka producer. And String messages are extracted from Java spark JavaPairDStream.
JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair
(record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));
String outTopics = "outputTopic";
String broker = "localhost:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {
ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);
System.out.println(message.key() + " : " + message.value()); //(1)
producer.send(message).get(); //(2)
}));
(1) line print the message string correctly. But when I send these messages with kafka producer like (2) line, it throws the exception like below,
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: org.apache.kafka.clients.producer.KafkaProducer@10f6530d)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample, functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeStatic com/aaa/StreamingKafkaDataWithSpark/streaming/SimpleDStreamExample.lambda$3:(Ljava/lang/String;Lorg/apache/kafka/clients/producer/Producer;Lscala/Tuple2;)V, instantiatedMethodType=(Lscala/Tuple2;)V, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample$$Lambda$941/0x000000010077e440, com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample$$Lambda$941/0x000000010077e440@7bb180e1)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$foreach$1$adapted:(Lorg/apache/spark/api/java/function/VoidFunction;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.api.java.JavaRDDLike$$Lambda$942/0x000000010077c840, org.apache.spark.api.java.JavaRDDLike$$Lambda$942/0x000000010077c840@7fb1cd32)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 30 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:926)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at org.apache.spark.api.java.JavaRDDLike.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike.foreach$(JavaRDDLike.scala:350)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
at com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample.lambda$2(SimpleDStreamExample.java:72)
at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1$adapted(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
I can not understand this exception. I confirm the kafaka producer messages are <String,String>
type with line (1). But why does line (2) throw this exception? Do I miss any process?
You need to create the producer for each RDD.
RDDs are distributed over multiple executors, and a Producer object cannot be serialized to be shared amongst them
Alternatively, look at the documentation of Structured Streaming and you could simply do this to write into a topic; no need to create and send records yourself
stream.writeStream().format("kafka")...
Note that if the goal is simply to map one topic into another, Kafka Streams API
is much simpler and less overhead than Spark