Search code examples
spring-kafka

KafkaListenerContainerFactory not getting created properly


I have two listener container factories one for main topic and another for retry topic as given below

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> primaryKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    factory.setConcurrency(3);
    factory.setAutoStartup(false);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);        
    errorHandler.setAckAfterHandle(true);
    factory.setErrorHandler(errorHandler);
    return factory;
}

@Bean
public ConsumerFactory<String, Object> primaryConsumerFactory() {
    Map<String, Object> map = new HashMap<>();
    Properties consumerProperties = getConsumerProperties();
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid");
    consumerProperties.forEach((key, value) -> map.put((String) key, value));
    ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
            getSoapMessageConverter());
    DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
    consumerFactory.setValueDeserializer(errorHandlingDeserializer);
    return consumerFactory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(retryConsumerFactory());
    factory.setConcurrency(3);
    factory.setAutoStartup(false);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(
            new MyDeadLetterPublishingRecoverer("mytopic",
                    deadLetterKafkaTemplate()),
            new FixedBackOff(5000, 2)));
    return factory;
}


@Bean
public ConsumerFactory<String, Object> retryConsumerFactory() {
    Map<String, Object> map = new HashMap<>();
    Properties consumerProperties = getConsumerProperties();
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "retry.id");
    consumerProperties.put("max.poll.interval.ms", "60000");
    consumerProperties.forEach((key, value) -> map.put((String) key, value));
    DefaultKafkaConsumerFactory<String, Object> retryConsumerFactory = new DefaultKafkaConsumerFactory<>(map);
    retryConsumerFactory.setValueDeserializer(getCustomMessageConverter());
    return retryConsumerFactory;
}

I have two separate listener classes which uses each of the aforementioned containers

There are two issues here

  1. Spring complains about - Error creating bean with name 'kafkaListenerContainerFactory' defined Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory' available: expected at least 1 bean which qualifies as autowire candidate.

To Fix this I have to rename primaryKafkaListenerContainerFactory to kafkaListenerContainerFactory. Why this is so?

  1. Second issue is kafkaRetryListenerContainerFactory is not seems to be taking whatever properties I try to set in retryConsumerFactory.(Especially "max.poll.interval.ms") instead it uses the properties set on primaryConsumerFactory in kafkaListenerContainerFactory

Solution

  • To Fix this I have to rename primaryKafkaListenerContainerFactory to kafkaListenerContainerFactory. Why this is so?

    That is correct, kafkaListenerContainerFactory is the default name when no containerProperty is on the listener and Boot will try to auto-configure it.

    You should name one of your custom factory with that name to override the Boot's auto configuration because you have an incompatible consumer factory.

    Your second question makes no sense to me.

    Perhaps your getConsumerProperties() is returning the same object each time - you need a copy.

    When asking questions like this, it's best to show all the relevant code.