I am using KafkaMessageListenerContainer
with (KafkaAdapter
).
How can I "nack" offsets in case of specific error, so the next poll()
will take them again?
properties.setAckMode(ContainerProperties.AckMode.BATCH);
final KafkaMessageListenerContainer<String, String> kafkaContainer = new KafkaMessageListenerContainer<>(consumerFactory , properties);
kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
@Override
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
}
});
Inside handleBatch
I am detecting the exception, for that specific exception I would like to do nack.
Tried to throw from there RuntimeException.
using springboot 2.7
Use the DefaultErrorHandler
- it does exactly that (the whole batch is retried according to the back off). You can classify which exceptions are retryable or not.
If you throw a BatchListenerFailedException
you can specify exactly which record in the batch had the failure and only retry it (and the following records).
EDIT
If any other type of exception is thrown, the DefaultErrorHandler falls back to using a FallbackBatchErrorHandler which calls ErrorHandlingUtils.retryBatch() which, pauses the consumer and redelivers the whole batch without seeking and re-polling (the polls within the loop return no records because the consumer is paused).
See the documentation. https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
This is required, because there is no guarantee that the batch will be fetched in the same order after a seek.
This is because we need to know the state of the batch (how many times we have retried). We can't do that if the batch keeps changing; hence the algorithm I described above.
To retry indefinitely you can, for example, use a FixedBackOff
with Long.MAX_VALUE
in the maxAttempts
property. Or use an ExponentialBackOff
with no termination.
Just be sure that the largest back off (and time to process a batch) is significantly less than max.poll.interval.ms
to avoid a rebalance.