Search code examples
javaspringspring-bootspring-kafka

Kafka consumer retry based on specific exceptions


I have a kafka listener/consumer which consumes from a topic "xyz" that has 10 partitions.

During message consumption or processing there might occur several exceptions. I would like to account for these exceptions. There are a certain exceptions which I would like to retry for a finite number of times.

I wrote the following code to implement this requirement

@Bean
public CommonErrorHandler customErrorHandler(){
       
      DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((rec,ex) -> 
       log.error("Finished retries. Commiting offset"), new FixedBackOff(3000l, 5));
       defaultErrorHandler.setCommitRecovered(true);
       defaultErrorHandler.addRetryableExceptions(SerializationException.class)
       return defaultErrorHandler;
     
    }

The above code retries consuming the same message for 5 times upon failure to consume/process with an interval of 3 seconds between each retry.

Springboot should pick up this bean and wire it into the auto configured Listener Container factory as per the docs.

My questions are:

  1. Is it possible to add more generic exception as a retryable exception instead of a specific one. For ex: I would like to use the instanceOf operator in JAVA to check if an exception thrown is of a certain type, and then configure the retries or retry policies. Is such a thing possible ?

  2. Can we retry infinitely for certain exceptions?

Note: I have implemented the exact same thing in earlier versions of Springboot. We have upgraded to 2.7 and I am looking for any examples or samples to achieve this task. Please help me with links to any examples or with any sample code.

  1. This question is unrelated to the above problem and its regarding the kafka consumer's "max.poll.records" property. The scenario is, we have one consumer in a consumer group, that is consuming messages from a topic with 10 partitions and all partitions have messages in them. If the "max.poll.records" is set to 1, does the consumer receive exactly one message per partition in the topic, bringing the cumulative messages consumed from all 10 partitions to ten? Is my understanding correct? Or does it poll only 1 record irrespective of the partitions?

Solution

  • See the javadocs; by default all exceptions are retryable except for a configured set of exceptions that are considered fatal. You can modify or replace that set:

    /**
     * Add exception types to the default list. By default, the following exceptions will
     * not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link ConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried, unless {@link #defaultFalse()} has been called.
     * @param exceptionTypes the exception types.
     * @see #removeClassification(Class)
     * @see #setClassifications(Map, boolean)
     */
    @SafeVarargs
    @SuppressWarnings("varargs")
    public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
    

    You don't need to add retryable exceptions, unless you have called defaultFalse().

    For complete control over back off selection, you can provide a back off function:

    /**
     * Set a function to dynamically determine the {@link BackOff} to use, based on the
     * consumer record and/or exception. If null is returned, the default BackOff will be
     * used.
     * @param backOffFunction the function.
     * @since 2.6
     */
    public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    

    It is best NOT to ask unrelated questions - ask separate questions.

    You will get 10 records but it is indeterminate which partitions they will come from; you will definitely not get 1 record from each partition on each poll.

    EDIT

    To change the default "fatal" classifications, you can either remove individual types:

    /**
     * Remove an exception type from the configured list. By default, the following
     * exceptions will not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link ConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried, unless {@link #defaultFalse()} has been called.
     * @param exceptionType the exception type.
     * @return the classification of the exception if removal was successful;
     * null otherwise.
     * @since 2.8.4
     * @see #addNotRetryableExceptions(Class...)
     * @see #setClassifications(Map, boolean)
     */
    @Nullable
    public Boolean removeClassification(Class<? extends Exception> exceptionType) {
    

    Or you can replace the defaults in one call:

    /**
     * Set an exception classifications to determine whether the exception should cause a retry
     * (until exhaustion) or not. If not, we go straight to the recoverer. By default,
     * the following exceptions will not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link ConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried.
     * When calling this method, the defaults will not be applied.
     * @param classifications the classifications.
     * @param defaultValue whether or not to retry non-matching exceptions.
     * @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
     * @see #addNotRetryableExceptions(Class...)
     */
    public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {