Search code examples
spring-bootapache-kafkamessagingspring-kafka

Spring Kafka and number of topic consumers


In my Spring Boot/Kafka project I have the following consumer config:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setConcurrency(10);    
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

        return factory;
    }

}

This is my PostConsumer:

@Component
public class PostConsumer {

    @Autowired
    private PostService postService;

    @KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

        postService.sendPost(consumerRecord.value());

    }

}

and the application.properties:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error

As you may see, I have added factory.setConcurrency(10); but it doesn't work. All of the PostConsumer.sendPost execute in the same Thread with name org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1

I'd like to be able to control the number of concurrent PostConsumer.sendPost listeners in order to work in parallel. Please show me how it can be achieved with Spring Boot and Spring Kafka.


Solution

  • The problem is here with consistency we are chasing in Spring Kafka using Apache Kafka Consumer. Such a concurrency is distributed between partitions in the topics provided. If you have only one topic and one partition in it, then there is indeed not going to be any concurrency. The point is to consume all the records from one partition in the same thread.

    There is some info on the matter in the Docs: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer

    If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

    And also JavaDocs:

    /**
     * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
     * Messages from within the same partition will be processed sequentially.
     * @param concurrency the concurrency.
     */
    public void setConcurrency(int concurrency) {