Search code examples
spring-bootapache-kafkadeserializationspring-kafkajson-deserialization

Serialization Exception in Spring-kafka Consumer Exception


When sending any message to my SpringBoot Kafka application, I am facing the Serialization Exception, here is the log.

2021-02-24 01:28:21.280  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-02-24 01:28:21.281  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-02-24 01:28:21.281  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1614110301280
2021-02-24 01:28:21.294  INFO 19249 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
2021-02-24 01:28:21.365  INFO 19249 --- [ntainer#0-0-C-1] c.h.a.m.service.KafkaConsumer            : #### -> Consumed message -> "22:26 Hello World!!"
2021-02-24 01:28:21.400 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.6.jar:2.6.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[34, 50, 50, 58, 50, 54, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, 33, 32, 32, 74, 97, 105, 32, 83, 104, 114, 105, 32, 82, 97, 109, 33, 33, 32, 66, 97, 114, 97, 109, 98, 97, 97, 114, 32, 74, 97, 105, 32, 72, 111, 33, 33, 34]] from topic [myTopic]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.hcl.anusheel.messagestream.request.dto.EntryObject` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
 at [Source: (byte[])""22:26 Hello World!!""; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1408) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:176) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555) ~[jackson-databind-2.11.4.jar:2.11.4]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar:2.6.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-02-24 01:28:21.412 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

How to fix this?

Here is the EntryObject.java class, this is part of the incoming request object which would be serialized and fed to the Kafka topic and the same should be retrieved from the Kafka Consumer for further processing.

public class EntryObject {
    @NonNull
    private String tradeId;
    @NonNull
    private int version;
    @NonNull
    private String counterPartyId;
    @NonNull
    private String bookId;
    @JsonFormat(pattern = "dd/MM/yyyy") 
    @DateTimeFormat(pattern = "dd/MM/yyyy")
    private LocalDate maturityDate;
    @JsonFormat(pattern = "dd/MM/yyyy") 
    @DateTimeFormat(pattern = "dd/MM/yyyy")
    private LocalDate createdDate;
    private char expired;
    
    public String getTradeId() {
        return tradeId;
    }
    public void setTradeId(String tradeId) {
        this.tradeId = tradeId;
    }
    public int getVersion() {
        return version;
    }
    public void setVersion(int version) {
        this.version = version;
    }
    public String getCounterPartyId() {
        return counterPartyId;
    }
    public void setCounterPartyId(String counterPartyId) {
        this.counterPartyId = counterPartyId;
    }
    public String getBookId() {
        return bookId;
    }
    public void setBookId(String bookId) {
        this.bookId = bookId;
    }
    public LocalDate getMaturityDate() {
        return maturityDate;
    }
    public void setMaturityDate(LocalDate maturityDate) {
        this.maturityDate = maturityDate;
    }
    public LocalDate getCreatedDate() {
        return createdDate;
    }
    public void setCreatedDate(LocalDate createdDate) {
        this.createdDate = createdDate;
    }
    public char getExpired() {
        return expired;
    }
    public void setExpired(char expired) {
        this.expired = expired;
    }
    
    @Override
    public String toString() {
        return "EntryObject [tradeId=" + tradeId + ", version=" + version + ", counterPartyId=" + counterPartyId
                + ", bookId=" + bookId + ", maturityDate=" + maturityDate + ", createdDate=" + createdDate
                + ", expired=" + expired + "]";
    }
}

Here is my KafkaConsumerConfig.java class.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("bar");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("headers");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("partitions");
    }

    
    public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(entryObjectConsumerFactory());
        return factory;
    }
}

and here is my KafkaConsumer.java class

@Service
public class KafkaConsumer {
    
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    @KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
    public void receive(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", message));
    }

    @KafkaListener(topics = "myTopic", containerFactory = "entryObjectKafkaListenerContainerFactory")
    public void receive(EntryObject entryObject) throws IOException {
        logger.info("received entryObject = '{}'", entryObject.toString());
    }
}

How to fix this exception? And have a smoothly running application.


Solution

  • Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of com.hcl.anusheel.messagestream.request.dto.EntryObject (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')

    The Json is a JSON encoded string "...".

    Jackson is trying to find a constructor for entry object that looks like this...

    public EntryObject(String data) { ... }
    

    and there is no such CTOR.