Search code examples
javaapache-kafkaspring-kafka

What acknowledgment strategy can I use when my incoming kafka message sends messages on multiple topics, and I want to batch the sends?


My application processes kafka messages and produces a number of outbound messages, on different topics. I use MANUAL acks.

The processing (DB access and creation of the outgoing messages) is synchronous, but I want to take advantage of batching in the producers, so I use KafkaTemplate.send on each outbound topic and combine the futures with CompletableFuture.allOf and attach a CompletionStage which acknowledges the incoming kafka message which produced these outgoing messages.

This guarantees that I will process the incoming message again if my service crashes before the outgoing messages are all delivered.

Unfortunately, different incoming messages may produce outgoing messages on different sets of topics. So I believe that a later message may have all its messages sent before an earlier message, depending on the timing of the batches on different outgoing topics.

This means that the later message may be acknowledged before the sending for the earlier message is complete, which of course amounts to acknowledging the earlier message too soon.

If I synchronously join() the future before processing the next incoming message, my throughput falls by a factor of 2 or 3, even with linger.ms = 0.

This feels like a problem which other people would have come across. Is there a standard approach? I can tolerate duplicate messages.


Solution

  • Consider setting the container property asyncAcks; this causes the container to defer out-of-order commits until any "gaps" are filled.

    If there are still gaps when all records from a poll have been processed, the consumer is paused until those gaps are filled.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#ooo-commits