Producer properties
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Consumer properties
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085
Consumer Service
@Service
public class UserConsumerService {
@KafkaListener(topics = { "user-topic" })
public void consumerUserData(User user) {
System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
}
}
Producer Service
@Service
public class UserProducerService {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendUserData(User user) {
kafkaTemplate.send("user-topic", user.getName(), user);
}
}
Producer Config for creating topic
@Configuration public class KafkaConfig {
@Bean
public NewTopic topicOrder() {
return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
}
}
Producer works well but Consumer gives error like
2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an
'ErrorHandlingDeserializer' in the value and/or key deserializer
at
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149)
~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1760
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1283
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na] Executors.java:539
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
~[na:na] FutureTask.java:264
at
java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thread.java:833 Caused by:
org.apache.kafka.common.errors.RecordDeserializationException: Error
deserializing key/value for partition user-topic-0 at offset 1. If
needed, please seek past the record to continue consumption.
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:134
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:721
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:672
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1507
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1497
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325)
~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage
I am new to Kafka and trying to figure out why I'm getting this error. How do I fix this?
Does the error message not tell you anything?
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
See the documentation:
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the
poll()
returns. To solve this problem, theErrorHandlingDeserializer
has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, theErrorHandlingDeserializer
returns anull
value and aDeserializationException
in a header that contains the cause and the raw bytes. When you use a record-levelMessageListener
, if theConsumerRecord
contains aDeserializationException
header for either the key or value, the container’sErrorHandler
is called with the failedConsumerRecord
. The record is not passed to the listener.[…]
You can use the
DefaultKafkaConsumerFactory
constructor that takes key and valueDeserializer
objects and wire in appropriateErrorHandlingDeserializer
instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by theErrorHandlingDeserializer
) to instantiate the delegates. The property names areErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
andErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
. The property value can be a class or class name. The following example shows how to set these properties:... // other props props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class); props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey") props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue") props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example") return new DefaultKafkaConsumerFactory<>(props);
With Boot:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer