Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafkaautoscaling

Parallel processing and auto scaling in spring-kafka KafkaListener


I'm using spring-kafka to consume messages from two Kafka topics, which sends same message format as below.

    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }
  • Can KafkaListener allocate the number of consumer threads according to the number of topics that it listens by it's own and parallel process messages in two topics? Or it doesn't support parallel processing and messages have to wait in the topic till one message gets processed?
  • In case if the number of messages in the topic is higher, I need to autoscale my micro-service to start new instances (till the number of partitions). What are the parameters (CPU, memory) I can depend on to find out the number of messages in the topics is higher from KafkaListener point of view? (i.e In an API I can auto-scale the service by monitoring the HTTP latency)

Solution

  • You can set the concurrency property to run more threads; but each partition can only be processed by one thread. To increase concurrency you must increase the number of partitions in each topic. When listening to multiple topics in the same listener, if those topics only have one partition, you may not get the concurrency you desire unless you change the kafka consumer partition assignor.

    See https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#using-ConcurrentMessageListenerContainer

    When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. ...