Search code examples
apache-kafkacommitoffsetkafka-consumer-api

Kafka commitAsync Retries with Commit Order


I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":

A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.

A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.

Can anyone shed light on what this means or even better provide a toy example demonstrating this?


Solution

  • Here is what I think it is, but got to humble I could be wrong

          try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(5);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.format("offset: %d\n", record.offset());
                    System.out.format("partition: %d\n", record.partition());
                    System.out.format("timestamp: %d\n", record.timestamp());
                    System.out.format("timeStampType: %s\n", record.timestampType());
                    System.out.format("topic: %s\n", record.topic());
                    System.out.format("key: %s\n", record.key());
                    System.out.format("value: %s\n", record.value());
                }
    
                consumer.commitAsync(new OffsetCommitCallback() {
                    private int marker = atomicInteger.incrementAndGet();
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                           Exception exception) {
                        if (exception != null) {
                            if (marker == atomicInteger.get()) consumer.commitAsync(this);
                        } else {
                            //Cant' try anymore
                        }
                    }
                });
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.commitSync(); //Block
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }