Search code examples
javaapache-kafkakafka-consumer-api

o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated:397 What does it mean?


I am using Java Kafka and when I am consuming messages, using offsetfortimes, intermittently, I am getting an log thread

INFO  o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated:397 - Resetting offset for partition XXX to offset 8793363.

And after this message I am not able to get any message from Kafka Consumer. Only when I restart the consumer the offsetfortimes is calculated again and my consumer is picking up those messages.

I would like to understand what this maybeSeekUnvalidated means, and why my offset is being reset.

My consumer starter code:

kafkaConsumer = consumerHelperFactory.getConsumerInstance(consumerConfiguration);
        long duration = startTime.until(Instant.now(), MINUTES)+1;
        log.info("Consumer consuming messages {} minutes ago",duration);
        Map<TopicPartition, Long> query = new HashMap<>();
        query.put(new TopicPartition(topic, 0), Instant.now().minus(duration, MINUTES).toEpochMilli());
        Map<TopicPartition, OffsetAndTimestamp> result = kafkaConsumer.offsetsForTimes(query);
        kafkaConsumer.assign(result.keySet().stream().collect(Collectors.toList()));            

Solution

  • When you use assign API instead of subscribe, consumer groups are not used, and therefore Kafka doesn't track offsets for you.

    As answered in your previous question, auto.offset.reset=latest is being applied until you reseek the consumer elsewhere to the offset you want. When you're at the latest offset, there's nothing to consume without some active producer sending new records... You can set the value to none if you want to manage offsets on your own.