If I have a enable.auto.commit=false
and I call consumer.poll()
without calling consumer.commitAsync()
after, why does consumer.poll()
return
new records the next time it's called?
Since I did not commit my offset, I would expect poll()
would return the latest offset which should be the same records again.
I'm asking because I'm trying to handle failure scenarios during my processing. I was hoping without committing the offset, the poll()
would return the same records again so I can re-process those failed records again.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
The starting offset of a poll is not decided by the broker but by the consumer. The consumer tracks the last received offset and asks for the following bunch of messages during the next poll.
Offset commits come into play when a consumer stops or fails and another instance that is not aware of the last consumed offset picks up consumption of a partition.
KafkaConsumer has pretty extensive Javadoc that is well worth a read.