I have an interesting moment with KafkaProducer class from org.apache.kafka.clients package. According to documentation, the execution of the flush() method should block code execution until all messages have been sent.
I have such code:
messageSender.flush();
messageSender.close();
method messageSender.flush() execute flush for all of my producers:
public void flush() {
producers.forEach(Producer::flush);
}
Before the execution of the first code block, I send some messages by send() method. But after the end, I see, that not all messages were sent before producers close.
If I change first code block to:
messageSender.flush();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
messageSender.close();
all messages are sent.
What I am doing wrong?
My producers have such configs:
(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
(ProducerConfig.ACKS_CONFIG, "0");
(ProducerConfig.LINGER_MS_CONFIG, "40");
(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Flush
Your main thread won't be blocked too long by producer.flush()
,
And that's because with 0
as ACK
, the producer will act more like an UDP
sender: if sent was called, then the message was succesfully completed, without any guarantee. So there's no real block-until-received mechanism with this configuration, and the buffers to empty by flush
would be already almost empty.
Close
The other one that should block your main thread until are messages are sent is the close()
method. From the docs:
close()
This method blocks until all previously sent requests complete.
According to this logic, you are right saying the code should wait until all messages are sent. The issue within your configuration seems to be the ACKS=0
property.
ACKs
With a value of 0, the producer won’t even wait for a response from the broker. It immediately considers the write successful the moment the record is sent out.
What may be happening is this: close()
will wait until all requests are complete, but with an ACK of 0, it will be lied regarding the successfull messages sent. The producer
won't wait for an acknowledgment, so will assume every message was sent correctly; The close()
operation will then be executed immediately after, without any real guarantee of the completed requests. In normal scenarios, this will affect mainly to the last sent requests.
What would I try is:
ACKs = 1
(at least)This will make both flush()
and close()
wait a little further, as the producer will need an ack from the broker in order to mark a request as successfull.
producer.close(500,TimeUnit.MILLISECONDS)
This close method waits up to timeout for the producer to complete the sending of all incomplete requests. This will also help with those last messages, and is the "official" way to do what you are manually doing with your sleep. Set it to the wait time you wish.
producer
Not only recommended in the src comments, but also will help with the general performance.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.