I need some help in Kafka Streams. I have started a Kafka stream application, which is streaming one topic from the very first offset. Topic is very huge in data, so I want to implement a mechanism in my application, using Kafka streams, so that I can get notified when topic has been read completely to the very last offset.
I have read Kafka Streams 2.8.0 api, I have found an api method i-e allLocalStorePartitionLags, which is returning map of store names to another map of partition containing all the lag information against each partition. This method returns lag information for all store partitions (active or standby) local to this Streams. This method is quite useful for me, in above case, when I have one node running that stream application.
But in my case, system is distributed and application nodes are 3 and topic partitions are 10, which meaning each node have at least 3 partitions for the topic to read from.
I need help here. How I can implement this functionality where I can get notified when topic has been read completely from partition 0 to partition 9. Please note that I don't have option to use database here as of now.
Other approaches to achieve goal are also welcomed. Thank you.
I was able to achieve lag information from adminClient api. Below code results end offsets and current offsets for each partitions against topics read by given stream application i-e applicationId.
AdminClient adminClient = AdminClient.create(kafkaProperties);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(applicationId);
// Current offsets.
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
// all topic partitions.
Set<TopicPartition> topicPartitions = topicPartitionOffsetAndMetadataMap.keySet();
// list of end offsets for each partitions.
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())));