Search code examples
javaspring-bootapache-kafkaspring-kafka

Spring boot Kafka send objects between different microservices


This is my producer microservice kafka configuration:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapService;

    //producer factory
    @Bean
    public ProducerFactory<String, Object> producerFactory (){
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configMap);
    }

    //inviare messaggi
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

I send the messages like this:

kafkaTemplate.send(TOPIC_NAME, message);

I have no problems with the producer,

This is the consumer microservice kafka configuration:

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
                ConsumerConfig.GROUP_ID_CONFIG, "testId"
        );
    }
}

With the same configuration but with String istead of Object or instead of my custom object I get the following stacktrace:

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
    ... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 18 common frames omitted

Do you have any advice for having producer and consumer in different services?

[EDIT] I'm adding the listener since it has been asked:

@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){

    log.info("message: {}", rcvMessage);

}

Note that the Message class is defined in both projects with the same parameters.

[Edit 2] It now works, I removed the headers in the deserializer like this:

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
    JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

Is this acceptable or is it just a work around?


Solution

  • When you send a message to kafka, a header is sent along with the message. Inside the header, by default, is the full address of the Custom Object.

    For example:

     com.example.kafkaproducer.model.Message
    

    The consumer side of the message class must be created in the same path of the producer. Because validation is done when receiving the message on the consumer side, if a path other than the path is specified, the following error is received.

    Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message

    Most likely, if you change the Message class path on the consumer side, your error will be fixed.