I have two different topics for Kafka. The datatype and structure of data (Json) is same in both topics. Can I use same listener container for both the topics? I tried using and I was able to get the output consumed from both topics.
Below are my configurations.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factoryConfigure.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
return factory;
}
This is my listener.
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic1");
System.out.println(model);
}
@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic2");
System.out.println(model);
}
I am not explicitly using any annotation to give name of container. Can that lead me to any issue?
It is not clear what you are asking; the container factory will create a different container for each listener.
The topics
property is an array so you can use
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}",
topics = { "${spring.kafka.consumer.topic}",
"${spring.kafka.consumer.topic2}" },
groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from both topics");
System.out.println(model);
}
If you need to know which topic each record came from, you can use
public void getTopics( List<Request> model,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
The topic at index n is for the record at model index n.