We have an http endpoint that receives some data and sends it to Kafka. We would like to immediately respond with an error if the broker is down, instead of retrying asynchronously. Is this possible at all?
What we are doing is starting the application, shutting down the broker and sending messages to see what happens. We are sending the messages using the blocking option described here.
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
When the broker goes down, we get the following warning:
[Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
After the time limit runs out, we get a TimeoutException error. But what we would like to do is catch the error as soon as the send is attempted. We have configured retries=0.
I would like to understand what happens when we send a message to Kafka while the broker is down.
This is the producer config:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 30000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.UUIDSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 10000
retries = 0
retry.backoff.ms = 200
transaction.timeout.ms = 60000
producer.send
puts data into an internal queue, which is only sent to the broker when the producer is flushed (which is the effect of calling .get()
.
If you need to detect a connection before calling .send
, then you need to actually make the connection beforehand, for example, using an AdminClient.describeCluster
method call