I am using spring for kafka, and I want to use the DefaultKafkaProducerFactoryCustomizer
to customize my producer factory because I need multiple value serializer, so I start with
@Bean
public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
return (producerFactory) -> {
producerFactory.setKeySerializer(new StringSerializer());
};
}
But the compiler isn't happy
incompatible types: org.apache.kafka.common.serialization.StringSerializer cannot be
converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>
same issue with (I use the ByteArraySerializer
to send message that failed to deserialize to dlq)
producerFactory.setValueSerializer(
new DelegatingByTypeSerializer(
Map.of(
byte[].class,
new ByteArraySerializer(),
Foo.class,
new JsonSerializer<>())));
I got:
incompatible types: org.springframework.kafka.support.serializer.DelegatingByTypeSerializer
cannot be converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>
but it work fine with just
producerFactory.setValueSerializer(new JsonSerializer<>());
the DefaultKafkaProducerFactoryCustomizer
class is defined like this
@FunctionalInterface
public interface DefaultKafkaProducerFactoryCustomizer {
/**
* Customize the {@link DefaultKafkaProducerFactory}.
* @param producerFactory the producer factory to customize
*/
void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);
}
How to solve this issue ?
I know the other alternatives to configure ValueSerialize
and KeySerializer
but i want to know first why my code isn't working as excepted
The compilation error on setKeySerializer
goes away after casting the producerFactory
variable from DefaultKafkaProducerFactory<?, ?>
to DefaultKafkaProducerFactory<String, ?>
. A similar cast works for setValueSerializer
method as well. Bean definition below:
@Bean
public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
return (producerFactory) -> {
((DefaultKafkaProducerFactory<String, ?>) producerFactory).setKeySerializer(new StringSerializer());
((DefaultKafkaProducerFactory<?, Object>) producerFactory)
.setValueSerializer(new DelegatingByTypeSerializer(
Map.of(
byte[].class,
new ByteArraySerializer(),
Foo.class,
new JsonSerializer<>())));
};
}