Search code examples
apache-kafkaspring-kafka

can we use same container for different topics?


I have two different topics for Kafka. The datatype and structure of data (Json) is same in both topics. Can I use same listener container for both the topics? I tried using and I was able to get the output consumed from both topics.

Below are my configurations.

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factoryConfigure.configure(factory, kafkaConsumerFactory);
    factory.setBatchListener(true);
    return factory;
}

This is my listener.


@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from topic1");
    System.out.println(model);

}

@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {

System.out.println("I am from topic2");
System.out.println(model);

}

I am not explicitly using any annotation to give name of container. Can that lead me to any issue?


Solution

  • It is not clear what you are asking; the container factory will create a different container for each listener.

    The topics property is an array so you can use

    @KafkaListener(id = "#{'${spring.kafka.listener.id}'}", 
        topics = { "${spring.kafka.consumer.topic}",
                   "${spring.kafka.consumer.topic2}" },
        groupId = "#{'${spring.kafka.consumer.group-id}'}")
    public void getTopics( List<Request> model) {
    
        System.out.println("I am from both topics");
        System.out.println(model);
    
    }
    

    If you need to know which topic each record came from, you can use

    public void getTopics( List<Request> model,
         @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
    

    The topic at index n is for the record at model index n.