Search code examples
javaspring-bootapache-kafkaspring-kafkakafka-producer-api

Java Spring Kafka Template producer lost messages on broker restart


I'm using spring-boot (2.1.6.RELEASE) with spring-kafka (2.2.7.RELEASE) and I'm sending messages to my kafka cluster using KafkaTemplate. But sometimes (usually when I restart a kafka-broker or do a rebalance) I see errors like this when I'm sending messages :

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Due to the default Kafka producer configs, I expect send failures to be retried but they are not. Default Kafka producer configs:

retries: 2147483647  (https://kafka.apache.org/documentation/#retries)
acks: 1               (https://kafka.apache.org/documentation/#acks)

My config is this:

@Bean
    public Map<String, Object> producerConfigs()
    {
        // See https://kafka.apache.org/documentation/#producerconfigs for more properties
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory()
    {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
                                                     ProducerFactory<Long, String> producerFactory)
    {
        KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
        return kafkaTemplate;
    }

and I'm sending messages like this:

kafkaTemplate.send(topicName, key, body);

I have searched all over the internet and everyone says that this configuration with the retries and acks must work but it doesn't. What I'm missing ?

Thanks


Solution

  • After spend some time debugging this I found the solution:

    props.put(ProducerConfig.ACKS_CONFIG, "all");
    

    For more information about this property: https://kafka.apache.org/documentation/#acks


    Very good blog showing different scenarios that you can lose messages in kafka:



    Side note - from this answer I found out that it is good idea to use this if you don't want to lose messages on shutdown:

    @PreDestroy
    public void flush()
    {
        kafkaTemplate.flush();
    }