Search code examples
javaspringspring-bootapache-kafkapublish-subscribe

Spring Kafka - Selecting @TopicPartition inactivates consument in a group


I have two consumer groups first and second subscribed to my-topic topic using Spring Boot version 2.1.8. I have 2 services using each separate consumer group.

Using the minimal setup I define a listener:

@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

The listing using kafka-consumer-groups displays the following:

root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first my-topic  2          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  0          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  1          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2

I wish the consumer in the first group uses only 0 and 2 partitions, so I try to achieve it with the listener:

@KafkaListener(topicPartitions = {
    @TopicPartition(
        topic = "my-topic",
        partitions = { "0", "2" }
    )}
)
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

Why the listing using the same command doesn't show the same as above except the parition 1 for the first group? Also the CONSUMER-ID values and further are empty and the @KafkaListener receives no messages at all for the first group (the listener in the second group works the same). How to fix it?

Consumer group 'first' has no active members.

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first my-topic  2          0               0               0               -               -               -
first my-topic  0          0               0               0               -               -               -

GROUP  TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2

Solution

  • Manually Assigning All Partitions Since you are assigning consumer thread to particular partitions Kafka will use assign() method and will not use group coordination. Each consumer acts independently even if it shares a groupId with another consumer

    Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management.

    But the command to check consumer position will only show the positions of consumers using group coordination

    Checking consumer position

    Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic

    KafkaConsumer

    Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

    And the final reason why listener is not consuming the messages is because of the offset, by default offset is set to latest and you need to specify the offset position particularly like this

    @KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
     public void listen(ConsumerRecord<?, ?> record) {
          ...
     }
    

    For more information about offset

    The first constructor takes an array of TopicPartitionOffset arguments to explicitly instruct the container about which partitions to use (using the consumer assign() method) and with an optional initial offset. A positive value is an absolute offset by default. A negative value is relative to the current last offset within a partition by default. A constructor for TopicPartitionOffset that takes an additional boolean argument is provided. If this is true, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started.

    Note : You can specify each partition in the partitions or partitionOffsets attribute but not both.