Search code examples
offsetread-uncommitted

Uncommitted event is not received in the next poll


I have a consumer with max.poll.records set to 1 and enable.auto.commit set to false for the manual offset control. However even when I am not calling commitSync, the subsequent poll is returning next event. Here are the details, I produced 4 events onto a topic, in consumer I am not committing for the third event I am skipping commitSync, I was expecting the third event to be returned in the next poll but fourth event has been returned. I am puzzled how evet 3 has been committed.

private static void pauseAndResume() {
    int retryDelay = 5; // seconds
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    SimpleProducer.produce(4); //(produces Event1, Event2, Event3, Event4)
    Properties properties = new Properties();
    String topicName = "output-topic";
    properties.put("bootstrap.servers", "localhost:29092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("group.id", "test-group");
    properties.put("max.poll.records", 1);
    properties.put("enable.auto.commit", false);
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = new ArrayList<String>();
    topics.add(topicName);
    kafkaConsumer.subscribe(topics);
    Collection<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
    PartitionInfo partitionInfo = kafkaConsumer.partitionsFor(topicName).get(0);
    topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    int eventsCount = 0;
    try {
        Date pausedAt = new Date();
        while (true) {;
            if (!kafkaConsumer.paused().isEmpty()) {
                if ((new Date().getTime() - pausedAt.getTime()) / 1000 % 60 >= retryDelay) {
                    System.out.println("Resuming Consumer..." + sdf.format(new Date()));
                    kafkaConsumer.resume(topicPartitions);
                }
            }
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(eventsCount + ":" + record.value());
                if (record.value().equals("Event3")) {
                    System.out.println("consumer is pausing...... for about " + retryDelay + " seconds " + sdf.format(new Date()));
                    kafkaConsumer.pause(topicPartitions);
                    pausedAt = new Date();      
                    break;
                }else {
                    kafkaConsumer.commitSync();
                }
            }
        }
    } catch (Exception e) {
        System.out.println(e.getMessage());
    } finally {
        kafkaConsumer.close();
    }
}

The link KafkaConsumer<K,V> doesn't tell how to stop offset advancing ): I think some smart internals detected indefinite poll of Event3 and returned Event4 instead As per my research (google and Kafka forums) I expect the Event3 to replayed as it was not committed, but it's not happening, request someone to point me in the right direction.

Many Thanks


Solution

  • I figured out a workaround to explicitly seek on the topic partition

    //In this use case we are consuming from single topic which has only one partition
    kafkaConsumer.seek(topicPartitions.iterator().next(), record.offset());