Error handling with @KafkaListener

I'm using spring-kafka with the following configuration:

package com.danigu.fancypants.infrastructure;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;

import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

 * @author dani
public class KafkaConfiguration {
    @Inject KafkaConfigurationProperties kcp;

    protected Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kcp.getBrokerAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kcp.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());

    public StringJsonMessageConverter stringJsonMessageConverter(ObjectMapper mapper) {
        return new StringJsonMessageConverter(mapper);

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            StringJsonMessageConverter messageConverter) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();


        return factory;

     * Retry template.

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy();
        return policy;

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        return policy;

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();


       return template;

And my listener looks like this:

package com.danigu.fancypants.integration.inbound.dress;

import com.danigu.fancypants.integration.inbound.InvalidRequestException;
import com.danigu.fancypants.integration.inbound.dress.payload.DressRequest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.util.Set;

 * @author dani
public class DressListener {

    @Inject protected Validator validator;

    @KafkaListener(topics = {"${kafka.dressesTopic}"})
    public void onMessage(@Payload DressRequest request, Acknowledgment acknowledgment) {



    protected void assertValidRequest(DressRequest request) {
        final Set<ConstraintViolation<DressRequest>> violations = validator.validate(request);

        if(!violations.isEmpty()) {
            throw new InvalidRequestException(violations, request);

So far i've been looking at the tests and reference documentation of spring-kafka, there the docs say that the ErrorHandler for the appropriate type should be configured, this test imply that i should configure it on ContainerProperties, although, that's only one error handler, in my use case, i would like to define multiple (for different payload types), is that possible, in case yes, how?

Also, is there a way maybe to describe which error handler to use on the annotated listener void?

Also, is there a way to describe a RecoveryCallback per @KafkaListener or maybe per different topics or there has to be different ListenerContainerFactorys for that?

I might get this completely wrong, could someone point me in the right direction how i could configure multiple ErrorHandlers for different payload types the right way please?


  • I am not sure what you mean by "different payload types" since you only have a single @KafkaListener. @KafkaListener at the class level can have @KafkaHandler at the method level for different payload types.

    In any case, there is only one error handler per container so you would need a different container factory for each error handler (same thing for the recovery callback).

    We recently added an errorHandler on the @RabbitListener in spring-amqp ...

     * Set an {@link RabbitListenerErrorHandler} to invoke if the listener method throws
     * an exception.
     * @return the error handler.
     * @since 2.0
    String errorHandler() default "";
 each method can have its own error handler there.

    We will probably do something similar for the next release of spring-kafka. But it would still only be one for each @KafkaListener so it won't help for class-level @KafkaListeners.