I get the below error message sometimes while using Spring Kafka Consumer .I have implemented at least once semantics as shown in the code snippet
1 )My doubt is do I miss any message from consuming?
2)Do i need to handle this error .As this error was not reported by seekToCurrentErrorHandler()
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
My spring kafka consumer code snippet
public class KafkaConsumerConfig implements KafkaListenerConfigurer
@Bean
public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
+ " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
}, new FixedBackOff(500L, 3L));
return seekToCurrentErrorHandler;
}
@Bean
public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
errorHandlingDeserializer);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(seekToCurrentErrorHandler());
return factory;
}
Consumer reading the message
@Service
public class KafKaConsumerService extends AbstractConsumerSeekAware {
@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {
databaseService.save(message);
System.out.println( "-- Consumer End -- " + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());
}
In this case, the exception is caused outside of record processing (after processing is complete). Since the commit failed due to a rebalance, there is no need for the STCEH to reseeek (and it can't anyway because the records are no longer available). It simply rethrows the exception.
Everything works as expected...
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=5000
@SpringBootApplication
public class So69016372Application {
public static void main(String[] args) {
SpringApplication.run(So69016372Application.class, args);
}
@KafkaListener(id = "so69016372", topics = "so69016372")
public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
System.out.println(in + " @" + offset);
Thread.sleep(6000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
}
}
Result
2021-09-01 13:47:26.963 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:31.991 INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:32.989 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
... 3 common frames omitted
2021-09-01 13:47:32.994 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:32.994 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:32.995 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:32.995 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:33.102 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:38.141 INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:39.108 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
... 3 common frames omitted
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:39.217 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
It will retry indefinitely.