Search code examples
apache-kafkakafka-consumer-apispring-kafka

@KafkaListener vs ConsumerFactory groupId


I followed "Intro to Apache Kafka with Spring" tutorial by baeldung.com. I set up a KafkaConsumerConfig class with the kafkaConsumerFactory method:

private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

and two "custom" factories:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("bar");
}

In the MessageListener class, instead I used @KafkaListener annotation to register consumers with the given groupId to listen on a topic:

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + message);
    ...
}

@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
    System.out.println("Received Message in group 'bar': " + message);
    ...
}

In this way there are two group of consumers, the ones having groupId "foo" and the ones having groupId "bar".

Now if I change container factory for the "foo" consumers from fooKafkaListenerContainerFactory to barKafkaListenerContainerFactory in this way

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    ...
}

It seems an incompatibility between groupId of KafkaListener and groupId of container factory but nothing changes. So, what I'm trying to understand is what props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);property does and why it seem is not considered.


Solution

  • The factory groupId is a default which is only used if there is no groupId (or id) on the @KafkaListener.

    In early versions, it was only possible to set the groupId on the factory, which meant you needed a separate factory for each listener if different groups are needed, which defeats the idea of a factory that can be used for multiple listeners.

    See the javadocs...

    /**
     * Override the {@code group.id} property for the consumer factory with this value
     * for this listener only.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the group id.
     * @since 1.3
     */
    String groupId() default "";
    
    /**
     * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
     * provided) as the {@code group.id} property for the consumer. Set to false, to use
     * the {@code group.id} from the consumer factory.
     * @return false to disable.
     * @since 1.3
     */
    boolean idIsGroup() default true;