This is related to Is there a "Circuit Breaker" for Spring Boot Kafka client?, but I still think it's a different question :)
We need to configure Spring Boot Kafka client so it does not try to connect at all.
The use case is that in the test environment we do not have Kafka running but we still need to build the complete Spring Boot context, so making this bean conditional on profile would not work. We don't care if the been is not connected, but we need it to be existing.
The problem is that the unsuccessful attempts to connect take about 30-40 seconds and our tests are significantly slowed down.
Which of the configuration parameters or which combination of them completely forbids the connection attempts, or at least forces the client to try it only once?
The code which retries to connect many times is this:
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
It repeatedly produces this warning:
WARN ... org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
The following code tries to connect only once:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}
public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions.<String, E>create(map)
.subscription(List.of(topic)));
}
produces
WARN 7268 ... org.apache.kafka.clients.NetworkClient : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
I have also tried setting the property
spring.kafka.admin.fail-fast=true
but this seem to have no effect at all.
Spring Boot auto configures a KafkaAdmin
which, by default, will connect to the broker to create any NewTopic
beans. You can set its autoCreate
property to false.
/**
* Set to false to suppress auto creation of topics during context initialization.
* @param autoCreate boolean flag to indicate creating topics or not during context initialization
* @see #initialize()
*/
public void setAutoCreate(boolean autoCreate) {
EDIT
To get a reference to the KafkaAdmin
, simply add it as a parameter to any bean definition.
e.g.
@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
admin.setAutoCreate(false);
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
Also see KafkaAdmin.initialize()
.
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
When using @KafkaListener
set autoStartup = "false"
to prevent the consumers from starting when the context is initialized.
With reactor, just don't subscribe to the Flux
returned by the receive*()
methods (that's what triggers the consumer creation).