Search code examples
apache-kafkakafka-consumer-apispring-kafka

CommitFailedException by Spring Kafka Consumer


I get the below error message sometimes while using Spring Kafka Consumer .I have implemented at least once semantics as shown in the code snippet

1 )My doubt is do I miss any message from consuming?

2)Do i need to handle this error .As this error was not reported by seekToCurrentErrorHandler()

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

My spring kafka consumer code snippet

  public class KafkaConsumerConfig implements KafkaListenerConfigurer 
    @Bean
        public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
            SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
                System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
                        + " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
            }, new FixedBackOff(500L, 3L));
            return seekToCurrentErrorHandler;
        }
    
        @Bean 
        public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
      
      ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
      errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
      
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      
      
      return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      errorHandlingDeserializer);
      
      }
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(AckMode.RECORD);
            factory.setErrorHandler(seekToCurrentErrorHandler());
            return factory;
        }

Consumer reading the message

@Service
public class KafKaConsumerService extends  AbstractConsumerSeekAware {

@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {


        
    databaseService.save(message);
        
        System.out.println( "-- Consumer End -- "   + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());


    }

Solution

    1. No, you are not missing anything.
    2. No, you do not need to handle it, the STCEH already handled it and the record will be redelivered on the next poll.

    In this case, the exception is caused outside of record processing (after processing is complete). Since the commit failed due to a rebalance, there is no need for the STCEH to reseeek (and it can't anyway because the records are no longer available). It simply rethrows the exception.

    Everything works as expected...

    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.properties.max.poll.interval.ms=5000
    
    @SpringBootApplication
    public class So69016372Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69016372Application.class, args);
        }
    
        @KafkaListener(id = "so69016372", topics = "so69016372")
        public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
            System.out.println(in + " @" + offset);
            Thread.sleep(6000);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
        }
    
    }
    

    Result

    2021-09-01 13:47:26.963  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
    foo @0
    2021-09-01 13:47:31.991  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    2021-09-01 13:47:32.989  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
    2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
    
    java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
    Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
        ... 3 common frames omitted
    
    2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
    2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
    2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
    2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
    ...
    2021-09-01 13:47:33.102  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
    foo @0
    2021-09-01 13:47:38.141  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    2021-09-01 13:47:39.108  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
    2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
    
    java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
    Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
        ... 3 common frames omitted
    
    2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
    2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
    2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
    2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
    ...
    2021-09-01 13:47:39.217  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
    foo @0
    

    It will retry indefinitely.