My application processes kafka messages and produces a number of outbound messages, on different topics. I use MANUAL acks.
The processing (DB access and creation of the outgoing messages) is synchronous, but I want to take advantage of batching in the producers, so I use KafkaTemplate.send
on each outbound topic and combine the futures with CompletableFuture.allOf
and attach a CompletionStage
which acknowledges the incoming kafka message which produced these outgoing messages.
This guarantees that I will process the incoming message again if my service crashes before the outgoing messages are all delivered.
Unfortunately, different incoming messages may produce outgoing messages on different sets of topics. So I believe that a later message may have all its messages sent before an earlier message, depending on the timing of the batches on different outgoing topics.
This means that the later message may be acknowledged before the sending for the earlier message is complete, which of course amounts to acknowledging the earlier message too soon.
If I synchronously join()
the future before processing the next incoming message, my throughput falls by a factor of 2 or 3, even with linger.ms = 0
.
This feels like a problem which other people would have come across. Is there a standard approach? I can tolerate duplicate messages.
Consider setting the container property asyncAcks
; this causes the container to defer out-of-order commits until any "gaps" are filled.
If there are still gaps when all records from a poll have been processed, the consumer is paused until those gaps are filled.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#ooo-commits