Search code examples
springapache-kafkakafka-producer-apispring-kafka

Kafka keeps producing requests even if broker is down


Currently when I create producer to send my records and for example for some reasons kafka is not available producer keeps sending the same message indefinitely. How I can stop producing messages for example after I received this error 3 times:

Connection to node -1 could not be established. Broker may not be available.

I'm using reactor kafka producer:

    @Bean
    public KafkaSender<String, String> createSender() {
        return KafkaSender.create(senderOptions());
    }

    private SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
        return SenderOptions.create(props);
    }

and then use it to send record:

sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
            .flatMap(result -> {
                if (result.exception() != null) {
                    return Flux.just(ResponseEntity.badRequest()
                        .body(result.exception().getMessage()));
                }
                return Flux.just(ResponseEntity.ok().build());
            })
            .next();

Solution

  • I'm afraid the clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); is not involved in the retry and it iterates until maxBlockTimeMs = 60000 by default. You can decrease this option for the producer via ProducerConfig.MAX_BLOCK_MS_CONFIG property:

    public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
        private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
                                                        + "These methods can be blocked either because the buffer is full or metadata unavailable."
                                                        + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
    

    UPDATE

    We can fix the problem like this:

    @PostMapping(path = "/v1/{topicName}")
    public Mono<ResponseEntity<?>> postData(
        @PathVariable("topicName") String topicName, String message) {
        return sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
            .flatMap(result -> {
                if (result.exception() != null) {
                    sender.close();
                    return Flux.just(ResponseEntity.badRequest()
                        .body(result.exception().getMessage()));
                }
                return Flux.just(ResponseEntity.ok().build());
            })
            .next();
    }
    

    Pay attention to the sender.close(); in the case of error.

    I think it's time to raise an issue against Reactor Kafka project to allow close producer on error.