Search code examples

Java Spring Kafka Template producer lost messages on broker restart

I'm using spring-boot (2.1.6.RELEASE) with spring-kafka (2.2.7.RELEASE) and I'm sending messages to my kafka cluster using KafkaTemplate. But sometimes (usually when I restart a kafka-broker or do a rebalance) I see errors like this when I'm sending messages :

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Due to the default Kafka producer configs, I expect send failures to be retried but they are not. Default Kafka producer configs:

retries: 2147483647  (
acks: 1               (

My config is this:

    public Map<String, Object> producerConfigs()
        // See for more properties
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return props;

    public ProducerFactory<Long, String> producerFactory()
        return new DefaultKafkaProducerFactory<>(producerConfigs());

    public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
                                                     ProducerFactory<Long, String> producerFactory)
        KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        return kafkaTemplate;

and I'm sending messages like this:

kafkaTemplate.send(topicName, key, body);

I have searched all over the internet and everyone says that this configuration with the retries and acks must work but it doesn't. What I'm missing ?



  • After spend some time debugging this I found the solution:

    props.put(ProducerConfig.ACKS_CONFIG, "all");

    For more information about this property:

    Very good blog showing different scenarios that you can lose messages in kafka:

    Side note - from this answer I found out that it is good idea to use this if you don't want to lose messages on shutdown:

    public void flush()