Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

Uniformly distribute the Kafka partitions among the Consumer


I have a topic with 300 partitions, and have 100 Consumers/Machines. I am using Spring Kafka as my underlying framework to implement the Kafka Consumers.

I am using ConcurrentKafkaListenerContainerFactory and setting the concurrency to 3, so in theory, I should have 300 Consumer Containers, and one partition should be connected to one container, thus the partitions being uniformly distributed among the 100 machines.

For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition s across the delegate KafkaMessageListenerContainer s.

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

But I don't see the above behavior, I see that some of the Containers/Machines are Idle, while others are connected to 6 partitions, which is causing Lag in Kafka Topic.

Am I doing something wrong here, how can I make sure that the partitions are evenly mapped between the Containers and no container is mapped to more than one partition? Please help.

key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit  : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest


factory.setConcurrency(3);

@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)

EI_LISTNER_FACTORY is a Bean..

@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {

    Boolean eiCnsumerStartup = [START_UP From Configuration]

    Integer concurrentThreadCount = 3;

    Map<String, Object> config = [properties from ABOVE]
    ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
    factory.setAutoStartup(eiConsumerStartup);

    if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(concurrentThreadCount);
    }
    return factory;

}


Solution

  • Config looks fine. Probably when you described consumer group, few consumers had gone unreachable/idle. Thus rebalancing would have caused to assign same server container thread to multiple partition.

    If this is not the case, Enable kafka level logs to monitor the partition assigned and revoked logs to check if rebalance is triggering the desired outcome.