I have two consumer groups first
and second
subscribed to my-topic
topic using Spring Boot version 2.1.8. I have 2 services using each separate consumer group.
Using the minimal setup I define a listener:
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
The listing using kafka-consumer-groups
displays the following:
root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first my-topic 2 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
first my-topic 0 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
first my-topic 1 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
I wish the consumer in the first
group uses only 0 and 2 partitions, so I try to achieve it with the listener:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "my-topic",
partitions = { "0", "2" }
)}
)
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
Why the listing using the same command doesn't show the same as above except the parition 1 for the first
group? Also the CONSUMER-ID
values and further are empty and the @KafkaListener
receives no messages at all for the first
group (the listener in the second
group works the same). How to fix it?
Consumer group 'first' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first my-topic 2 0 0 0 - - -
first my-topic 0 0 0 0 - - -
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
Manually Assigning All Partitions Since you are assigning consumer thread to particular partitions Kafka will use assign()
method and will not use group coordination. Each consumer acts independently even if it shares a groupId with another consumer
Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management.
But the command to check consumer position will only show the positions of consumers using group coordination
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic
Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.
And the final reason why listener is not consuming the messages is because of the offset, by default offset is set to latest and you need to specify the offset position particularly like this
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
For more information about offset
The first constructor takes an array of TopicPartitionOffset arguments to explicitly instruct the container about which partitions to use (using the consumer assign() method) and with an optional initial offset. A positive value is an absolute offset by default. A negative value is relative to the current last offset within a partition by default. A constructor for TopicPartitionOffset that takes an additional boolean argument is provided. If this is true, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started.
Note : You can specify each partition in the partitions or partitionOffsets attribute but not both.