In case that Kafka server is (temporarily) down, my Spring Boot application ReactiveKafkaConsumerTemplate
keeps trying to connect unsuccessfully, thus causing unnecessary traffic and messing the log files:
2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
Is it possible to use something like a circuit breaker (an inspiration here or here), so the Spring Boot Kafka client in case of a failure (or even better a few consecutive failures) slows down the pace of its connection attempts, and returns to the normal pace only after the server is up again?
Is there already a ready-made config parameter, or any other solution?
I am aware of the parameter reconnect.backoff.ms
, this is how I create the ReactiveKafkaConsumerTemplate
bean:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("com.example.myapplication");
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions
.<String, MyEvent>create(map)
.withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
.withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
.subscription(List.of("MyTopic")));
}
And still the consumer is trying to connect every 3 seconds.
See https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
and https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
and