Search code examples
spring-bootkafka-consumer-apispring-kafkajava-11

Kafka - CommitFailedException - No timeout poll(max.poll.interval.ms)


I have a Spring Kafka application and I'm getting an error when consumer tries to commit offset. Here is my Kafka consumer configuration:

KafkaConsumer:

@Bean
public ConsumerFactory<String, Item> consumerFactory() {
        log.info("Configuring Kafka Consumer properties - consumerFactory");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cpo-executor-groupid");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(Item.class, false));
}

KafkaAdmin:

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic processTopic() {
    return TopicBuilder.name(topicName).partitions(2).build();
}

I know that this happen if the process take more than max.poll.interval.ms or session.timeout.ms from kafka, but it is not my case. My application take less than 1 second to consume and process the message:

Time: 11:00:32.773 Configuring Kafka Consumer properties - consumerFactory
Time: 11:00:39.433  INFO [cpo-executor,,] 55630 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig
Time: 11:00:57.293 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator: 
[Consumer clientId=consumer-cpo-executor-groupid-1, groupId=cpo-executor-groupid] Offset commit failed on partition process-topic-1 at offset 95: 
The coordinator is not aware of this member.
Time: 11:00:57.299 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] c.c.cpoexecutor.config.KafkaErrHandler   : Error in process with Exception org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. and the records are []

What is happening since I didn't change any Kafka configuration and the default value for max.poll.interval.ms is 5 minutes?

kakfa: 2.13-2.8.0
spring-kafka: 2.7.6
spring: 2.4.2


Solution

  • I've changed the groupId name according to the environment, so I won't have the same groupId on the same kafka.

    KafkaConsumer:

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offSetReset;
    
    @Bean
    public ConsumerFactory<String, Item> consumerFactory() {
       log.info("Configuring Kafka Consumer properties - consumerFactory");
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSetReset);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    

    application-local.yml:

    spring:
      kafka:
        bootstrap-servers: #####:9092
        consumer:
          auto-offset-reset: earliest
          group-id: cpo-executor-groupid-local
    
    

    application-dev.yml:

    spring:
      kafka:
        bootstrap-servers: #####:9092
        consumer:
          auto-offset-reset: earliest
          group-id: cpo-executor-groupid-dev
    

    application-prd.yml

    spring:
      kafka:
        consumer:
          bootstrap-servers: #####:9092
          auto-offset-reset: earliest
          group-id: cpo-executor-groupid-prd