Search code examples
springspring-bootapache-kafkaspring-kafka

Offset is being committed even though acknowledge() is never called


I am implementing a micro service, which reads messages from Kafka queue and writes them in a database. I am using spring-boot 1.5.6.RELEASE and spring-kafka 1.3.0.RELEASE. To avoid loosing data I needed to be sure that the messages were being persisted in the database before committing the offset, so I set the enable.auto.commit to false and the AckMode to MANUAL_IMMEDIATE. Here is my Kafka configuration:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  ...

  @Bean
  public Map<String, Object> consumerConfigs() {
    return new HashMap<String, Object>() {
      {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      }
    };
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

    return factory;
  }

  ...
}

For the implementation of the listener I am using the @KafkaListener annotation. After the message is being persisted in the database, I use the acknowledge() method to commit the offset. Here's how my listener looks:

@KafkaListener(topics = "${kafka.myTopic}")
  public void receive(ConsumerRecord<String, String> payload, Acknowledgment acknowledgment) {
   // persist message here

    acknowledgment.acknowledge();

    latch.countDown();
  }

To test my application I stopped the database so that when the business logic tries to persist the message, a runtime exception would be thrown before the offset is being committed by the acknowledge() method:

1) Stopped the database.

2) Send a message with content MESSAGE_1.

3) Start the database.

4) Send another message with content MESSAGE_2.

The end result was that the database contained only MESSAGE_2, so the first message was lost. The only way I could get both messages in the database was when I restarted the micro service after I started the database:

1) Stopped the database.

2) Send a message with content MESSAGE_1.

3) Start the database.

4) Restart the micro service.

5) Send another message with content MESSAGE_2.

This time both of the messages were in the database. My question is why in the first scenario the offset was committed event though a runtime exception was thrown and acknowledge() was never called? And what is the correct way of implementing my kafka listener so that if something happens during the processing of the received message I wouldn't loose data?

Thank you in advance!


Solution

  • You have to study how Apache Kafka works.

    The commit offset is exactly for new consumers in the same group or for the same restarted. For the currently ran consumer it doesn't make sense and Broker tracks the current offset in the memory, so all this commits unrelated to the records fetching process.

    You have to consider to seek consumer back to the position you are interested in: https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#seek

    Also see this GH issue: https://github.com/spring-projects/spring-kafka/issues/470