Search code examples
javaspringapache-kafkamessaging

KafkaMessageListenerContainer vs ConcurrentMessageListenerContainer


If I use listener.concurrency property, would this use KafkaMessageListenerContainer or ConcurrentMessageListenerContainer? Note I have not defined any beans explicitly and I leave the required bean creation to spring boot.

I wanted to know if my understanding is correct. I would want parallelism in consuming the message from topic which has got multiple partitions say 3. So is it fine if I configure it in application.yml file as follows. I have tried below configuration and see 3 consumers were created.

spring:
   kafka:
     consumer:
        group-id: group-id
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
     listener:
        concurrency: 3 

So my question is, do I have to create a bean to configure ConcurrentMessageListenerContainer or the above listener configuration would internally would use ConcurrentMessageListenerContainer instead of KafkaMessageListenerContainer when it sees listener.concurrency=3?


Solution

  • spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
    

    Concurrency is the property of ConcurrentMessageListenerContainer. If we are defining this property in application.yml means we want to use ConcurrentMessageListenerContainer.

    enter image description here

    In Kafka, only one consumer is allowed to read data from one partition at a time. Using KafkaMessageListenerContainer will provide only single threaded message listener consumer. KafkaMessageListenerContainer

    Using ConcurrentMessageListenerContainer will provide the number of KafkaMessageListenerContainer defined in the Concurrency property in application.yml.

    We can get the same behavior using following Listener using Java code.

    @Configuration
    @EnableKafka
    public class KafkaListenerConfiguration {
    
        @Autowired
        private Environment environment;
    
        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.setBatchListener(true);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            return props;
        }
    }
    

    I would suggest you to go through the below link to have a practical example of the same.

    https://howtoprogram.xyz/2016/09/25/spring-kafka-multi-threaded-message-consumption/