Search code examples
javaspringapache-kafkakafka-consumer-apispring-kafka

Reading the same message several times from Kafka


I use Spring Kafka API to implement Kafka consumer with manual offset management:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

Here, I want the consumer to commit the offset only if someCondition holds. Otherwise the consumer should sleep for some time and read the same message again.

Kafka Configuration:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

With the current configuration, if someCondition == false, consumer doesn't commit the offset, but still reads the next messages. Is there a way to make the consumer reread a message if the Kafka acknowledgement wasn't performed?


Solution

  • You can stop and restart the container and it will be re-sent.

    With the upcoming 1.1 release, you can seek to the required offset and it will be resent.

    But you will still see later messages first if they have already been retrieved so you will have to discard those too.

    The second milestone has that feature and we expect it to be released next week.