Search code examples
spring-cloudspring-kafka

How to recover from exceptions sent by producer.send() in Spring Cloud Stream


We experienced the following scenario :

  • We have a Kafka cluster composed of 3 nodes, each topic created has 3 partitions
  • A message is sent through MessageChannel.send(), producing a record for, let's say, partition 1
  • The broker acting as the partition leader for that partition fails

By default, MessageChannel.send() returns true and doesn't throw any exception, even if, eventually, the KafkaProducer can't send successfully the message. We observe, about 30 seconds after this call, the following message in the logs : Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

In our case, this is not acceptable as we have to be sure that all messages are eventually delivered to Kafka, at the moment of the return of the call to MessageChannel.send().

We turned on spring.cloud.stream.kafka.bindings.<channelName>.producer.sync to true which does exactly as the documentation describes. It blocks the caller for the producer's acknowledgment of the success or the failure of the delivery (MessageTimeoutException, InterruptedException, ExecutionException), all of this controlled by KafkaProducerMessageHandler. It seems to be the best approach for us as the performance impact is negligible in our case.

But, do we need to take care of the retry ourselves if an exception is thrown ? (in our client code with @Retryable for instance)

Here is a simple project to experiment : https://github.com/phdezann/spring-cloud-bus-kafka-helloworld


Solution

  • If the send() is performed on the @StreamListener thread and the exception is thrown back to the binder, the binder retry configuration will perform retries.

    However, since you are doing the send on an HTTP thread you will need to do your own retry (call send within the scope of a RetryTemplate()) or make the controller method @Retryable.