I am a beginner to Apache Kafka. The below code samples are for my Kafka producer and consumer.
Kafka producer code:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
String inputTopic = "inputTopic";
String broker = "localhost:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
String message = " >>>>>>>>>>>>>>>>>>>>> Data Message";
int key = 0;
while(key < 1) {
key = key + 1;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(inputTopic, String.valueOf(key), (message + " " + key));
producer.send(record).get();
}
producer.close();
And the next codes are about the Kafka consumer
Kafka consumer code:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleDStreamExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Collection<String> topics = Arrays.asList("inputTopic");
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("kafka.consumer.id", "kafka-consumer-01");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));
JavaInputDStream<ConsumerRecord<String, String>> inputStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
inputStream.print();
ssc.start();
ssc.awaitTermination();
ssc.close();
}
But the Kafka consumer throws the following exception,
19/09/16 20:57:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
19/09/16 20:57:58 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2); not retrying
19/09/16 20:57:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
I can not understand the cause of these exceptions. I confirm the <String,String>
type message on Kafka producer. But why are the consumer messages not serializable?
If we compare your code to the example given in the documentation, you've not extracted any data from the ConsumerRecord, which is not serializable, so its data cannot be collected and printed
JavaPairRDD<String, String> outStream =
inputStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
outStream.print();