Search code examples
javaapache-kafkakafka-consumer-api

How to check a partition is assigned in Kafka


I am trying to find out a way of checking if a partition is assigned before I do something on that partition. We know the way to find if the consumer is assigned or not by checking

 consumer.assignment().isEmpty

But how I will do it to the partitions, my code is below

List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream() 
   // before coming to this map i need to check if this partition is assigned
   .map(part->new TopicPartition(subTopicName,part.partition()))
   .......;
             
                 

Is there any way i can do that? please suggest.


Solution

  • You've already answered your own question. Use consumer.assignment()

    // Data to check
    String topic = "...";
    int partition = 0;
    
    // Check it
    boolean isAssigned = consumer.assignment()
      .stream()
      .anyMatch(tp -> tp.topic().equals(topic) && tp.partition() == partition)
    

    Otherwise, in your consumer poll loop, you also have acesss to the partition that each record comes from, and can check there.