I have a kafka listener/consumer which consumes from a topic "xyz" that has 10 partitions.
During message consumption or processing there might occur several exceptions. I would like to account for these exceptions. There are a certain exceptions which I would like to retry for a finite number of times.
I wrote the following code to implement this requirement
@Bean
public CommonErrorHandler customErrorHandler(){
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((rec,ex) ->
log.error("Finished retries. Commiting offset"), new FixedBackOff(3000l, 5));
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.addRetryableExceptions(SerializationException.class)
return defaultErrorHandler;
}
The above code retries consuming the same message for 5 times upon failure to consume/process with an interval of 3 seconds between each retry.
Springboot should pick up this bean and wire it into the auto configured Listener Container factory as per the docs.
My questions are:
Is it possible to add more generic exception as a retryable exception instead of a specific one. For ex: I would like to use the instanceOf operator in JAVA to check if an exception thrown is of a certain type, and then configure the retries or retry policies. Is such a thing possible ?
Can we retry infinitely for certain exceptions?
Note: I have implemented the exact same thing in earlier versions of Springboot. We have upgraded to 2.7 and I am looking for any examples or samples to achieve this task. Please help me with links to any examples or with any sample code.
See the javadocs; by default all exceptions are retryable except for a configured set of exceptions that are considered fatal. You can modify or replace that set:
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionTypes the exception types.
* @see #removeClassification(Class)
* @see #setClassifications(Map, boolean)
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
You don't need to add retryable exceptions, unless you have called defaultFalse()
.
For complete control over back off selection, you can provide a back off function:
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
It is best NOT to ask unrelated questions - ask separate questions.
You will get 10 records but it is indeterminate which partitions they will come from; you will definitely not get 1 record from each partition on each poll.
EDIT
To change the default "fatal" classifications, you can either remove individual types:
/**
* Remove an exception type from the configured list. By default, the following
* exceptions will not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionType the exception type.
* @return the classification of the exception if removal was successful;
* null otherwise.
* @since 2.8.4
* @see #addNotRetryableExceptions(Class...)
* @see #setClassifications(Map, boolean)
*/
@Nullable
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
Or you can replace the defaults in one call:
/**
* Set an exception classifications to determine whether the exception should cause a retry
* (until exhaustion) or not. If not, we go straight to the recoverer. By default,
* the following exceptions will not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* When calling this method, the defaults will not be applied.
* @param classifications the classifications.
* @param defaultValue whether or not to retry non-matching exceptions.
* @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
* @see #addNotRetryableExceptions(Class...)
*/
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {