Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

Use kafkaListener with syncCommit, but they not work


I have an application that listens to a Kafka topic and reads 1 message from it in 1 thread. After reading a message, there is a certain logic, during which there are two options - everything is successful and a commit occurs call method ...acknowledge() to offset the offset and move on to the next message, or everything is unsuccessful and this message must be read again and the offset is not committed (...acknowledge() is not called) But now for some reason I can’t read the same message and always move on to the next one, although I don’t call ...acknowledge() and it seems like the offset for my groupId should be on the same message.

I'm using spring-kafka and the following settings are specified:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1

And I programmatically set the settings:

...setGroupId("test");
...setSyncCommits(true);
...setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
...setConcurrency(1);

Solution

  • The Kafka consumer keeps track of offsets in-memory, without resorting to the offsets saved on the cluster. The cluster offsets are only used when the consumer is connecting (or otherwise doesn't know the offset).

    You need to either retry in-memory, or manually seek() the consumer to the previous position.