I followed "Intro to Apache Kafka with Spring" tutorial by baeldung.com.
I set up a KafkaConsumerConfig
class with the kafkaConsumerFactory
method:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
and two "custom" factories:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
In the MessageListener
class, instead I used @KafkaListener
annotation to register consumers with the given groupId
to listen on a topic:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar': " + message);
...
}
In this way there are two group of consumers, the ones having groupId "foo" and the ones having groupId "bar".
Now if I change container factory for the "foo" consumers from fooKafkaListenerContainerFactory
to barKafkaListenerContainerFactory
in this way
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
...
}
It seems an incompatibility between groupId
of KafkaListener
and groupId
of container factory but nothing changes.
So, what I'm trying to understand is what props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
property does and why it seem is not considered.
The factory groupId
is a default which is only used if there is no groupId
(or id
) on the @KafkaListener
.
In early versions, it was only possible to set the groupId on the factory, which meant you needed a separate factory for each listener if different groups are needed, which defeats the idea of a factory that can be used for multiple listeners.
See the javadocs...
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;