Search code examples
springapache-kafkaspring-el

SpEL KafkaListener. How can i inject custom deserializer through properties?


I am using spring. I have a configured ObjectMapper for the entire project and I use it to set up a kafka deserializer. And then I need a custom kafka deserializer to be used in KafkaListener.

I'm configuring KafkaListener via autoconfiguration, not via @Configuration class.

@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
    private final ObjectMapper objectMapper;

    @SneakyThrows
    @Override
    public MyMessage deserialize(String topic, byte[] data) {
        return objectMapper.readValue(data, MyMessage.class);
    }
}

If i do like this

@KafkaListener(
    topics = {"${topics.invite-user-topic}"},
    properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}

I received KafkaException: Could not find a public no-argument constructor

But with public no-argument constructor in CustomMessageDeserializer class i am getting NPE because ObjectMapper = null. It creates and uses a new class, not a spring component.

@KafkaListener supports SpEL expressions.

And I think that this problem can be solved using SpEL. Do you have any idea how to inject spring bean CustomMessageDeserializer with SpEL?


Solution

  • There are no easy ways to do it with SPeL.

    Analysis

    To get started, see the JavaDoc for @KafkaListener#properties:

    /**
    * 
    * SpEL expressions must resolve to a String ...
    */
    

    The value of value.deserializer is used to instantiate the specified deserializer class. Let's follow the call chain:

    1. You specify this value in the @KafkaListener annotation, then you are probably not creating a bean of the ConsumerFactory.class. So Spring creates this bean class itself - see KafkaAutoConfiguration#kafkaConsumerFactory.
    2. Next is the creation of the returned object new DefaultKafkaConsumerFactory(...) as ConsumerFactory<?,?> using the constructor for default delivery expressions keyDeserializer/valueDeserializer = () -> null
    3. This factory is used to create a Kafka consumer (The entry point is the constructor KafkaMessageListenerContainer#ListenerConsumer, then KafkaMessageListenerContainer.this.consumerFactory.createConsumer...)
    4. In the KafkaConsumer constructor, the valueDeserializer object is being created, because it is null (for the default factory of point 2 above):
    if (valueDeserializer == null) {
         this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
    
    1. The implementation of config.getConfiguredInstance involves instantiating your deserializer class via a parameterless constructor using reflection and your String "com.service.deserializer.CustomMessageDeserializer" class name

    Solutions

    1. To use value.deserializer with your customized ObjectMapper, you must create the ConsumerFactory bean yourself using the setValueDeserializer(...) method. This is also mentioned in the second Important part of the JSON.Mapping_Types.Important documentation
    2. If you don't want to create a ConsumerFactory bean, and also don't have complicated logic in your deserializer (you only have return objectMapper.readValue(data, MyMessage.class);), then register DefaultKafkaConsumerFactoryCustomizer:
    @Bean
    // inject your custom objectMapper
    public DefaultKafkaConsumerFactoryCustomizer customizeJsonDeserializer(ObjectMapper objectMapper) {  
        return consumerFactory ->
                consumerFactory.setValueDeserializerSupplier(() ->
                        new org.springframework.kafka.support.serializer.JsonDeserializer<>(objectMapper));
    }
    

    In this case, you don't need to create your own CustomMessageDeserializer class (remove it) and Spring will automatically parse the message into your MyMessage. @KafkaListener annotation should also not contains the property properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}. This DefaultKafkaConsumerFactoryCustomizer bean will automatically be used to configure the default ConsumerFactory<?, ?> (see the implementation of the KafkaAutoConfiguration#kafkaConsumerFactory method)