Search code examples
spring-bootgroovydocker-composespring-kafka

Spring for Apache Kafka JSON Deserialization Exception Class Not Found


I'm trying to get a messages from Kafka topic, but for some reason I get the following error:

2022-06-28 14:17:52.044  INFO 1 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-api1-1, groupId=api1] Seeking to offset 1957 for partition ActiveProxySources-0
2022-06-28T14:17:52.688451744Z 2022-06-28 14:17:52.687 ERROR 1 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for ActiveProxySources-0@1957
2022-06-28T14:17:52.688499949Z 
2022-06-28T14:17:52.688511943Z org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688544511Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688555996Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2738) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688564633Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2612) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688573552Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2544) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688582961Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2429) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688591538Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2307) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688600362Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1981) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688610882Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688620353Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1356) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688629357Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688637662Z  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
2022-06-28T14:17:52.688646009Z  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
2022-06-28T14:17:52.688655783Z  at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-06-28T14:17:52.688664349Z Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688674537Z  at org.springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:150) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688683348Z  at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:204) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688699174Z  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688707618Z  at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688718316Z  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688728359Z  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688736716Z  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688748228Z  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688758573Z  at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688768278Z  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688776576Z  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688785598Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688793960Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688802367Z  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688811023Z  ... 4 common frames omitted
2022-06-28T14:17:52.688819230Z Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688828306Z  at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688837754Z  at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688846335Z  at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688854685Z  at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:201) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688862907Z  ... 16 common frames omitted
2022-06-28T14:17:52.688870692Z Caused by: java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688888550Z  at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[na:na]
2022-06-28T14:17:52.688898662Z  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[na:na]
2022-06-28T14:17:52.688907289Z  at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[java.jar:na]
2022-06-28T14:17:52.688915418Z  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
2022-06-28T14:17:52.688923583Z  at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
2022-06-28T14:17:52.688931577Z  at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
2022-06-28T14:17:52.688939753Z  at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.19.jar!/:5.3.19]
2022-06-28T14:17:52.688948555Z  at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688957079Z  ... 19 common frames omitted
2022-06-28T14:17:52.688964715Z 

I have other applications that send and read messages on Kafka topics with the same settings and they all work fine, but not this application. Ideally, I want to read messages from two Kafka topics (messages in both topics look the same and contain the same objects), but even when I try to read messages from one topic, I get the error shown above. The settings are follows:

class KafkaMessage {

    String id

    IdStatus status

}

@Service
@Slf4j
class ConsumerService {

    Set<String> activeProxies = []

    int getActiveProxiesNumber() {
        activeProxies.size()
    }

    Set<String> activeProxySources = []

    int getActiveProxySourcesNumber() {
        activeProxySources.size()
    }


    @KafkaListener(topics = "ActiveProxies"/*, containerFactory = "KafkaListenerContainerFactoryActiveProxies"*/)
    public void consumeProxyId(KafkaMessage message) {
        log.info("Consuming ${message.id}: ${message.status}")
        if (message.status == IdStatus.ADD) {
            activeProxies.add(message.id)
        }
        if (message.status == IdStatus.DELETE) {
            activeProxies.remove(message.id)
        }
    }

    @KafkaListener(topics = "ActiveProxySources"/*, containerFactory = "KafkaListenerContainerFactoryActiveProxySources"*/)
    public void consumeProxySourceId(KafkaMessage message) {
        log.info("Consuming ${message.id}: ${message.status}")
        if (message.status == IdStatus.ADD) {
            activeProxySources.add(message.id)
        }
        if (message.status == IdStatus.DELETE) {
            activeProxySources.remove(message.id)
        }
    }

}

TopicConfig:

@Configuration
public class TopicConfig {

    @Value(value = "kafka:9092")
    private String bootstrapAddress

    @Value(value = "ActiveProxies")
    private String activeProxies

    @Value(value = "ActiveProxySources")
    private String activeProxySources

//    @Bean
//    public KafkaAdmin kafkaAdmin() {
//        Map<String, Object> configs = new HashMap<>();
//        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
//        return new KafkaAdmin(configs);
//    }

    @Bean
    public NewTopic ActiveProxiesTopic() {
        return TopicBuilder.name(activeProxies)
                .partitions(1)
                .replicas(1)
                .config(org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG, "60000")
                .build()
    }

    @Bean
    public NewTopic ActiveProxySourcesTopic() {
        return TopicBuilder.name(activeProxySources)
                .partitions(1)
                .replicas(1)
                .config(org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG, "60000")
                .build()
    }

}

application.properties file:

server.port=30329
spring.data.mongodb.database=free-proxy-engine

spring.kafka.bootstrap-servers=kafka:9092
spring.kafka.consumer.group-id=consumer-Api1
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

I use Docker-Compose to run Kafka and all other applications

docker-compose.yaml:

version: '2'
services:

  mongodb:
    image: mongo:5.0.9
    restart: unless-stopped

  api:
    image: openjdk:11
    depends_on:
      - mongodb
      - kafka
    restart: unless-stopped
    volumes:
      - ./libs/api-0.0.1-SNAPSHOT.jar:/gjava/java.jar
    environment:
      spring_data_mongodb_host: mongodb
      spring_kafka_consumer_group-id: api1
    command: /bin/bash -c "cd /gjava && chmod +x /gjava/*.jar && java -jar /gjava/java.jar"
    ports:
      - 30329:30329

  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
     ZOOKEEPER_CLIENT_PORT: 2181
     ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka
    restart: always
    hostname: kafka
    depends_on:
      - zookeeper
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

I created my own Consumer configuration file for Kafka, but the error remained when I tried to read messages from two topics and from one.

@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value(value = "kafka:9092")
    private String bootstrapAddress

    @Bean
    public ConsumerFactory<String, KafkaMessage> ConsumerFactoryActiveProxies() {
        Map<String, Object> props = new HashMap<>()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Api-1")
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*")

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class)
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName())

        return new DefaultKafkaConsumerFactory<>(props/*,
                new StringDeserializer(),
                new JsonDeserializer<>(KafkaMessage.class)*/)
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage>
    KafkaListenerContainerFactoryActiveProxies() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory
                = new ConcurrentKafkaListenerContainerFactory<>()
        factory.setConsumerFactory(ConsumerFactoryActiveProxies())
        factory.setMessageConverter(new StringJsonMessageConverter())
        return factory
    }

    @Bean
    public ConsumerFactory<String, KafkaMessage> ConsumerFactoryActiveProxySources() {
        Map<String, Object> props = new HashMap<>()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Api-2")
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*")

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class)
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName())

        return new DefaultKafkaConsumerFactory<>(props/*,
                new StringDeserializer(),
                new JsonDeserializer<>(KafkaMessage.class)*/)
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage>
    KafkaListenerContainerFactoryActiveProxySources() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory
                = new ConcurrentKafkaListenerContainerFactory<>()
        factory.setConsumerFactory(ConsumerFactoryActiveProxySources())
        factory.setMessageConverter(new StringJsonMessageConverter())
        return factory
    }
}

I will be grateful for your help.


Solution

  • By default, the deserializer will use type information in headers to determine which type to create.

    Caused by: java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage

    Most likely, KafkaMessage is in a different package on the sending side.

    There are a couple of solutions:

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config

    1. Set JsonDeserializer.USE_TYPE_INFO_HEADERS to false and JsonDeserializer.VALUE_DEFAULT_TYPE to com.new.package.kafka.KafkaMessage (the fully qualified name of KafkaMessage on the receiving side).

    2. Use type mapping: https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-mapping-types

    I suggest you read this whole section https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-serde