A simple spring-boot-kafka which consumes from a topic on a network cluster:
Errors:
Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
Puzzle:
The configured broker is not local, it's BROKER_1.FOO.NET:9094, and it is available.
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
config class:
@Slf4j
@EnableKafka
@Configuration
@PropertySource("dv/application.properties")
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER_1.FOO.NET:9094");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer Class:
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "foo_topic", groupId = "group_1")
public void consume(String message) {
log.info(message);
}
}
Despite the ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG setting, the correct boostrap server wasn't being used. The ConsumerConfig values in the log was: bootstrap.servers = [localhost:9092].
I needed to change the Bean name from "concurrentKafkaListenerContainerFactory" to "kafkaListenerContainerFactory". So:
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
needed to change to:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
Once I made this single change, the bootstrap server were correctly recognized for the configured listener.
Now, the ConsumerConfig log shows:
bootstrap.servers=[BROKER_1.FOO.NET:9094]
Moreover, the bootstrap server doesn't need to configure advertised.listeners, despite the implication of this article: https://www.confluent.io/blog/kafka-listeners-explained/.
When I asked our Kafka admin about adding advertised.listeners, he told me that he had actually removed them for simplicity and sent me this configuration description:
advertised.listeners
use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners, it is not valid to advertise the 0.0.0.0 meta-address. Also unlike listeners, there can be duplicated ports in this property, so that one listener can be configured to advertise another listener's address. This can be useful in some cases where external load balancers are used.Listeners to publish to ZooKeeper for clients to
He is correct because I am getting the correct the expected meta-data when using kafkacat:
kafkacat -b BROKER_1.FOO.NET:9094 -L
which returns :
Metadata for all topics (from broker 2: sasl_ssl://xx.foo.net:9094/2):
5 brokers:
broker 5 at xx22.ttgtpmg.net:9094 (controller)
broker 4 at xx21.ttgtpmg.net:9094
broker 1 at xx18.ttgtpmg.net:9094
broker 2 at xx19.ttgtpmg.net:9094
broker 3 at xx20.ttgtpmg.net:9094