Search code examples
javaspringapache-kafkaspring-kafka

Is kafka container factory a requirement in Spring Kafka?


I have a simple consumer in Spring working. I have a config class defined with a bunch of factories, etc. When I remove the config class, the consumer still works. I'm wondering the benefit of having the factory, ie:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
            GenericRecord> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
    }

and now just passing vals in via application properties and calling it a day. I have explicit control over the config in the class-based approach, but was also thinking I could drop the class and have the vals be available through the spring env variables like spring.kafka.bootstrapservers, for example.


Solution

  • The container factory is required for @KafkaListener methods.

    Spring Boot will auto-configure one (from application.properties/yml) if you don't provide your own bean. See KafkaAutoConfiguration.

    Boot will also configure the consumer factory (if you don't).

    An application, typically, does not need to declare any infrastructure beans.

    EDIT

    I prefer to never declare my own infrastructure beans. If I need some feature that is not exposed as a Boot property, or where I want to override some property for just one container, I simply add a customizer bean.

    @Component
    class Customizer {
    
        public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
    
            factory.setContainerCustomizer(container -> {
                if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
                    container.getContainerProperties().setIdleBetweenPolls(60_000);
                }
            });
        }
    
    }
    

    or

    @Component
    class Customizer {
    
        Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
                ThreadPoolTaskExecutor exec) {
    
            containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
        }
    
    }
    

    etc.