Search code examples
apache-kafkaspring-kafkakafka-producer-api

Spring Kafka Producer: receive inmediate error when broker is down


We have an http endpoint that receives some data and sends it to Kafka. We would like to immediately respond with an error if the broker is down, instead of retrying asynchronously. Is this possible at all?

What we are doing is starting the application, shutting down the broker and sending messages to see what happens. We are sending the messages using the blocking option described here.

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

When the broker goes down, we get the following warning:

[Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

After the time limit runs out, we get a TimeoutException error. But what we would like to do is catch the error as soon as the send is attempted. We have configured retries=0.

I would like to understand what happens when we send a message to Kafka while the broker is down.

This is the producer config:

    acks = -1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 30000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.UUIDSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 50
    reconnect.backoff.ms = 50
    request.timeout.ms = 10000
    retries = 0
    retry.backoff.ms = 200
    transaction.timeout.ms = 60000

Solution

  • producer.send puts data into an internal queue, which is only sent to the broker when the producer is flushed (which is the effect of calling .get().

    If you need to detect a connection before calling .send, then you need to actually make the connection beforehand, for example, using an AdminClient.describeCluster method call