Spring for Apache Kafka has a default mechanism of 10 attempts after an error is thrown from a consumed message. I want this mechanism to remain in place, but I also want a way to stop it if the error thrown is the AbortException.class
type.
@KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(@NotNull ConsumerRecord<String, Object> message) throws IOException {
...
doSomething();
}
If doSomething()
throws the AbortException
exception, I want to prevent any further retries and abort the processing of this consumed message.
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
return new DefaultErrorHandler((consumerRecord, exception) -> log.error("Error while processing: {}, exception: {}", consumerRecord.topic(), exception.getClass()), fixedBackOff);
}
This handler only runs when all attempts are exhausted and the doSomething()
method couldn't be executed successfully. I want something that runs after every attempt to check which exception is thrown, and if it's an AbortException
, abort the remaining retries.
Is it possible?
See the documentation.
and the javadocs
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionTypes the exception types.
* @see #removeClassification(Class)
* @see #setClassifications(Map, boolean)
*/
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
You can also make it even more fine-grained...
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
on the DefaultErrorHandler
.