Search code examples
apache-kafkaspring-kafkaspring-cloud-stream

Spring Cloud Stream Kafka Producer force producer.flush


I am creating a kafka streams/in-out kind of application. Sample code looks like the following

private MessageChannel output;

public void process(List<String> input) {
--somelogic
output.send()
}

Based on my understanding, kafka buffers the messages before sending them out. Now in case the app crashes / container crashes, there is a possibility of losing the buffered messages.

How can we ensure that messages are actually sent out

(something like KafkaTemplate.flush() here ?)

EDIT Based on the suggestion by Gary Russell, we should set the FLUSH header on the last message.

Follow up question - Given the last send method call will become a blocking call because of the blocking nature of the kafkaProducer.flush() method, if there are exceptions thrown during the sending process of the kafka message (e.g. io exception/ auth exception) will they be raised in the same method context ?

e.g. in the following code will the kafka sender exception be caught in the catch block ?

public void process(List<String> input) {
--somelogic
  try {
  Message<?> message = //
  message.setHeader(KafkaIntegrationHeaders.FLUSH, true);
  output.send()
 }
catch(Exception e) {
  e.printstacktrace()
}
}

Solution

  • The underlying KafkaProducerMessageHandler has a property:

    /**
     * Specify a SpEL expression that evaluates to a {@link Boolean} to determine whether
     * the producer should be flushed after the send. Defaults to looking for a
     * {@link Boolean} value in a {@link KafkaIntegrationHeaders#FLUSH} header; false if
     * absent.
     * @param flushExpression the {@link Expression}.
     */
    public void setFlushExpression(Expression flushExpression) {
    

    Currently, the binder doesn't support customizing this property.

    However, if you are sending Message<?>s, you can set the KafkaIntegrationHeaders.FLUSH header to true on the last message in the batch.