Search code examples
javaapache-kafkaspring-kafka

Bootstrap broker not being used to consume from topic


A simple spring-boot-kafka which consumes from a topic on a network cluster:

Errors:

Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

Puzzle:

The configured broker is not local, it's BROKER_1.FOO.NET:9094, and it is available.

pom.xml

  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.1</version>
  </dependency>

config class:

@Slf4j
@EnableKafka
@Configuration
@PropertySource("dv/application.properties")
public class KafkaConfig {
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER_1.FOO.NET:9094");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Consumer Class:

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "foo_topic", groupId = "group_1")
    public void consume(String message) {
        log.info(message);
    }
}

Solution

  • Despite the ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG setting, the correct boostrap server wasn't being used. The ConsumerConfig values in the log was: bootstrap.servers = [localhost:9092].

    I needed to change the Bean name from "concurrentKafkaListenerContainerFactory" to "kafkaListenerContainerFactory". So:

    @Bean
    public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
    

    needed to change to:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    

    Once I made this single change, the bootstrap server were correctly recognized for the configured listener.

    Now, the ConsumerConfig log shows:

    bootstrap.servers=[BROKER_1.FOO.NET:9094]

    Moreover, the bootstrap server doesn't need to configure advertised.listeners, despite the implication of this article: https://www.confluent.io/blog/kafka-listeners-explained/.

    When I asked our Kafka admin about adding advertised.listeners, he told me that he had actually removed them for simplicity and sent me this configuration description:

    advertised.listeners

    use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners, it is not valid to advertise the 0.0.0.0 meta-address. Also unlike listeners, there can be duplicated ports in this property, so that one listener can be configured to advertise another listener's address. This can be useful in some cases where external load balancers are used.Listeners to publish to ZooKeeper for clients to

    He is correct because I am getting the correct the expected meta-data when using kafkacat:

    kafkacat -b BROKER_1.FOO.NET:9094 -L 
    

    which returns :

    Metadata for all topics (from broker 2: sasl_ssl://xx.foo.net:9094/2):   
    5 brokers:   
    broker 5 at xx22.ttgtpmg.net:9094 (controller)    
    broker 4 at xx21.ttgtpmg.net:9094   
    broker 1 at xx18.ttgtpmg.net:9094   
    broker 2 at xx19.ttgtpmg.net:9094   
    broker 3 at xx20.ttgtpmg.net:9094