Search code examples
spring-kafka

KafkaMessageListenerContainer how to do nack for specific error


I am using KafkaMessageListenerContainer with (KafkaAdapter).

How can I "nack" offsets in case of specific error, so the next poll() will take them again?

properties.setAckMode(ContainerProperties.AckMode.BATCH);

final KafkaMessageListenerContainer<String, String> kafkaContainer = new KafkaMessageListenerContainer<>(consumerFactory , properties);



kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
     

  @Override
  public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
         CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
   }

});

Inside handleBatch I am detecting the exception, for that specific exception I would like to do nack.
Tried to throw from there RuntimeException.

using springboot 2.7


Solution

  • Use the DefaultErrorHandler - it does exactly that (the whole batch is retried according to the back off). You can classify which exceptions are retryable or not.

    If you throw a BatchListenerFailedException you can specify exactly which record in the batch had the failure and only retry it (and the following records).

    EDIT

    If any other type of exception is thrown, the DefaultErrorHandler falls back to using a FallbackBatchErrorHandler which calls ErrorHandlingUtils.retryBatch() which, pauses the consumer and redelivers the whole batch without seeking and re-polling (the polls within the loop return no records because the consumer is paused).

    See the documentation. https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

    This is required, because there is no guarantee that the batch will be fetched in the same order after a seek.

    This is because we need to know the state of the batch (how many times we have retried). We can't do that if the batch keeps changing; hence the algorithm I described above.

    To retry indefinitely you can, for example, use a FixedBackOff with Long.MAX_VALUE in the maxAttempts property. Or use an ExponentialBackOff with no termination.

    Just be sure that the largest back off (and time to process a batch) is significantly less than max.poll.interval.ms to avoid a rebalance.