I am using spring. I have a configured ObjectMapper for the entire project and I use it to set up a kafka deserializer. And then I need a custom kafka deserializer to be used in KafkaListener.
I'm configuring KafkaListener via autoconfiguration, not via @Configuration class.
@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
private final ObjectMapper objectMapper;
@SneakyThrows
@Override
public MyMessage deserialize(String topic, byte[] data) {
return objectMapper.readValue(data, MyMessage.class);
}
}
If i do like this
@KafkaListener(
topics = {"${topics.invite-user-topic}"},
properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}
I received KafkaException: Could not find a public no-argument constructor
But with public no-argument constructor in CustomMessageDeserializer class i am getting NPE because ObjectMapper = null. It creates and uses a new class, not a spring component.
@KafkaListener supports SpEL expressions.
And I think that this problem can be solved using SpEL. Do you have any idea how to inject spring bean CustomMessageDeserializer with SpEL?
There are no easy ways to do it with SPeL.
To get started, see the JavaDoc for @KafkaListener#properties
:
/**
*
* SpEL expressions must resolve to a String ...
*/
The value of value.deserializer
is used to instantiate the specified deserializer class. Let's follow the call chain:
@KafkaListener
annotation, then you are probably not creating a bean of the ConsumerFactory.class
. So Spring creates this bean class itself - see KafkaAutoConfiguration#kafkaConsumerFactory
.new DefaultKafkaConsumerFactory(...)
as ConsumerFactory<?,?>
using the constructor for default delivery expressions keyDeserializer/valueDeserializer = () -> null
KafkaMessageListenerContainer#ListenerConsumer
, then KafkaMessageListenerContainer.this.consumerFactory.createConsumer...
)KafkaConsumer
constructor, the valueDeserializer
object is being created, because it is null (for the default factory of point 2 above):if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
config.getConfiguredInstance
involves instantiating your deserializer class via a parameterless constructor using reflection and your String "com.service.deserializer.CustomMessageDeserializer"
class namevalue.deserializer
with your customized ObjectMapper
, you must create the ConsumerFactory
bean yourself using the setValueDeserializer(...)
method. This is also mentioned in the second Important part of the JSON.Mapping_Types.Important documentationConsumerFactory
bean, and also don't have complicated logic in your deserializer (you only have return objectMapper.readValue(data, MyMessage.class);
), then register DefaultKafkaConsumerFactoryCustomizer
:@Bean
// inject your custom objectMapper
public DefaultKafkaConsumerFactoryCustomizer customizeJsonDeserializer(ObjectMapper objectMapper) {
return consumerFactory ->
consumerFactory.setValueDeserializerSupplier(() ->
new org.springframework.kafka.support.serializer.JsonDeserializer<>(objectMapper));
}
In this case, you don't need to create your own CustomMessageDeserializer
class (remove it) and Spring will automatically parse the message into your MyMessage
.
@KafkaListener
annotation should also not contains the property properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}
. This DefaultKafkaConsumerFactoryCustomizer
bean will automatically be used to configure the default ConsumerFactory<?, ?>
(see the implementation of the KafkaAutoConfiguration#kafkaConsumerFactory
method)