Search code examples
javaapache-kafkaspring-kafka

Incompatible types when trying serialization with producer customizer


I am using spring for kafka, and I want to use the DefaultKafkaProducerFactoryCustomizer to customize my producer factory because I need multiple value serializer, so I start with

  @Bean
  public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
    return (producerFactory) -> {
      producerFactory.setKeySerializer(new StringSerializer());
    };
  }

But the compiler isn't happy

incompatible types: org.apache.kafka.common.serialization.StringSerializer cannot be 
converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>

same issue with (I use the ByteArraySerializer to send message that failed to deserialize to dlq)

  producerFactory.setValueSerializer(
      new DelegatingByTypeSerializer(
          Map.of(
              byte[].class,
              new ByteArraySerializer(),
              Foo.class,
              new JsonSerializer<>())));

I got:

incompatible types: org.springframework.kafka.support.serializer.DelegatingByTypeSerializer 
cannot be converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>

but it work fine with just

producerFactory.setValueSerializer(new JsonSerializer<>());

the DefaultKafkaProducerFactoryCustomizer class is defined like this

@FunctionalInterface
public interface DefaultKafkaProducerFactoryCustomizer {

    /**
     * Customize the {@link DefaultKafkaProducerFactory}.
     * @param producerFactory the producer factory to customize
     */
    void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);

}

How to solve this issue ?

I know the other alternatives to configure ValueSerialize and KeySerializer but i want to know first why my code isn't working as excepted


Solution

  • The compilation error on setKeySerializer goes away after casting the producerFactory variable from DefaultKafkaProducerFactory<?, ?> to DefaultKafkaProducerFactory<String, ?>. A similar cast works for setValueSerializer method as well. Bean definition below:

    @Bean
    public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
        return (producerFactory) -> {
            ((DefaultKafkaProducerFactory<String, ?>) producerFactory).setKeySerializer(new StringSerializer());
            ((DefaultKafkaProducerFactory<?, Object>) producerFactory)
                    .setValueSerializer(new DelegatingByTypeSerializer(
                            Map.of(
                                byte[].class, 
                                new ByteArraySerializer(), 
                                Foo.class, 
                                new JsonSerializer<>())));
        };
    }