Search code examples
apache-kafkakafka-consumer-apispring-kafka

How to pass multiple bootstrap servers for listener using spring-kafka


I have a listener that need to read from multiple kafka servers with the same topic which are all configured under one zookeeper. How do I read from those multiple servers. Can you please help with this.

Instead of Kafka servers, can I point to zookeeper instead?


Solution

  • The @KafkaListener requires KafkaListenerContainerFactory @Bean, which, in turn, is based on the ConsumerFactory. And the DefaultKafkaConsumerFactory accepts a Map<String, Object> of consumer configs:

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
            ...
            return props;
        }
    }
    

    https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation

    Where that ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG is exactly standard Apache Kafka bootstrap.servers property:

    A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

    No, you can't point Zookeeper address. That isn't supported by Kafka any more.