Search code examples
javaspring-bootapache-kafkaspring-kafkajsonserializer

springboot kafka value.serializer = StringSerializer despite producerConfig and the application.yml showing otherwise


I'm having trouble with my Kafka producer. For some reason, the value.serializer is being set to StringSerializer despite me specifically setting it to JsonSerializer in both the producerConfig and the application.yml as you can see below

public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    public ProducerFactory<String, CustomType > producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CustomType > kafkaTemplate(ProducerFactory<String, CustomType > producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

And once again in the application.yml file, just for good measure

spring:
  kafka:
    producer:
      value.serializer: org.springframework.kafka.support.serializer.JsonSerializer

The problem lies here in the last line of this producerConfig; somehow the value.serializer is now StringSerializer.

ProducerConfig values: 
    acks = -1
    batch.size = 65536
    bootstrap.servers = [***********,***********]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = lz4
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 52428800
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Unsuprisingly this is causing a org.apache.kafka.common.errors.SerializationException: Can't convert value of class CustomType to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

I can confirm that the correct profile is active from The following 1 profile is active: "test"

I also pinged the bootStrapServers and both of them are up and running

Another thing is that I searched all files in my projects and the only place where StringSerializer is even used is in the producerConfig when setting the KEY_SERIALIZER_CLASS_CONFIG.

Just a bit more context, below is the function I use in order to send the kafka message

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaTelemetryRawSend {

    @Autowired
    private KafkaTemplate<String, CustomType > kafkaTemplate;

    public void send(CustomType customType){
        System.out.println("kafka sending");
        kafkaTemplate.send("topic", customType);
    }
}

Any help/suggestions to put me in the right direction would be much appreciated


Solution

  • I think the mistake is that your producerConfig() is not being used (try to debug it), because your producerFactory() method does not have @Bean annotation and Spring creates own Bean in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration class:

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {...}
    

    This KafkaAutoConfiguration class has field org.springframework.boot.autoconfigure.kafka.KafkaProperties- it is your application.yml binding with spring.kafka... properties, which is used for create bean ProducerFactory. But your application.yml has an invalid property value.serializer instead of value-serializer.

    Solution

    Try to add @Bean annotation on your producerFactory() method or fix name of value.serializer property to value-serializer.

    But I recommend you use Spring KafkaProperties in your @Configuration class and define kafka properties in the application.yml file:

    @Configuration
    public class KafkaProducerConfig {
        private final KafkaProperties kafkaProperties;
    
        public KafkaConfig(KafkaProperties kafkaProperties) {
            this.kafkaProperties = kafkaProperties;
        }
    
        @Bean
        public ProducerFactory<String, CustomType> producerFactory() {
            return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
        }
    
        @Bean
        public KafkaTemplate<String, CustomType> kafkaTemplate(ProducerFactory<String, CustomType> producerFactory) {
            return new KafkaTemplate<>(producerFactory);
        }
    }
    

    with application.yml:

    spring:
      kafka:
        producer:
          batch-size: ...
          ...
          properties:
            ...
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
        bootstrap-servers:
          - localhost:9092