what's my understanding is as of now when I set auto commit to false
and ack mode to manual immediate
then any exception occurs. I'm not going to ack to broker. So when I restart my consumer process with same consumergroupid
I need to read the message again since I didn't ack.
@KafkaListener(topics = "testtopic", groupId = "testgroupID")
public void listenGroupFoo(String message, Acknowledgment acknowledgment,
@Header(KafkaHeaders.OFFSET) int offsets,
@Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
//performing some process like insert to DB or
calling some downstream process, once this processes is done
I would like to inform kafka broker that I'm done with processing
by implementing acknowledgment.acknowledge(); for each individual message
one by one
}
and ConsumerFactory config is
enable.auto.commit = false
auto.offset.reset = earliest
and ConcurrentKafkaListenerContainerFactory config is
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
That is exactly what will happen, unless you ack the next record in the partition; in that case the failed record will be skipped; Kafka only keeps a "last" committed offset.
If you throw an exception for the failed record, by default, the framework will rewind the partition and redeliver the failed record up to 9 times; see Handling Exceptions.