Search code examples
apache-kafkakafka-consumer-apikafka-producer-api

Consumer fetch data returns OFFSET_OUT_OF_RANGE


I have a cluster with 3 kafka brokers, with a topic called fallback_topic There is only one consumerGroup that consumes from this topic and only one consumer in this consumerGroup

After injecting a few messages, I can see the messages has been published to Kafka. The LogSize has been moved by the new messages; however, Consumer Offset stays the same and no message is ever consumed.

Below is the log when consumer.poll(3000) ran. The partition (4, 7, 10) received new messages from producer, but when consumer tried to read it, it reports error=OFFSET_OUT_OF_RANGE

04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Node 654000 sent a full fetch response that created a new incremental fetch session 685508830 with 7 response partition(s)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 1062 for partition fallback_topic-1 returned fetch data (error=NONE, highWaterMark=1062, lastStableOffset = -1, logStartOffset = 1062, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 124094 for partition fallback_topic-4 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 762 for partition fallback_topic-7 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 897 for partition fallback_topic-10 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)

My understanding is when leader of the partition changed the offset, but follower hasn't, thats when this error happens. But there is no broker outage, so consumer is using the same leader all the time. Can anyone help me with why there is a OFFSET_OUT_OF_RANGE error. Thank you very much. Below is my code and I skipped consumer.commitAsync() because my problem happened before the commit.

        List<Event> events = new ArrayList<Event>();
        consumer.subscribe(Arrays.asList("fallback_topic"));
        ConsumerRecords<String, byte[]> records;
        
        do {
            logger.info("Start polling messages from " + topic);
            records = consumer.poll(3000);

            logger.info("done polling.");
            records.partitions().forEach(tp -> logger.info("found records from "+tp.topic()+"-"+tp.partition()));
            for (ConsumerRecord<String, byte[]> record : records) {
                Event event = EventKafkaSerializer.serializer.deserializeEvent(new ByteArrayInputStream(record.value()));
                logger.info(event.getId()+" "+event.getData().toString());
                events.add(event);
            }
           
        } while(records.count()>0);
        
        logger.info("Found total events "+events.size());

Solution

  • Found out why.

    I forget to run consumer.close() at the end