Search code examples
springspring-bootapache-kafkaspring-kafkaspring-boot-3

Spring Boot 3 with Kafka acknowledge callback not working


I'm testing Spring Boot 3 with spring-kafka. I would like to be notified (similar to a callback) when the sent message has been processed (when the acknowledgment is executed in the consumer). Currently, the message 'Message sent successfully' is being printed without waiting for the message ACK. Here's the code I'm using:

MessageController:

@RestController
@RequestMapping("/messages")
@RequiredArgsConstructor
@Slf4j
public class MessageController {

  private final KafkaTemplate<Object, String> kafkaTemplate;

  @GetMapping
  public void sendMessage() {
    CompletableFuture<SendResult<Object, String>> future = kafkaTemplate.send("message-topic", "message");
    future.whenComplete((result, ex) -> {
      if (ex == null) {
        log.info("Message sent successfully {}", result);
      } else {
        log.error("Error sending message", ex);
      }
    });
  }

  @KafkaListener(id = "message-topic-listener", topics = "message-topic")
  public void messageListener(@Payload String message, Acknowledgment acknowledgment) throws InterruptedException {
    Thread.sleep(10000L);
    log.info("Processing message {}", message);
    acknowledgment.acknowledge();
  }
}

application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:29092
    listener:
      ack-mode: MANUAL
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      bootstrap-servers: localhost:29092

Solution

  • Believe I have found the answer to my own question above, the 'whenComplete' method of the 'SendResult' class is executed when the message is successfully written/replicated to the partition, not on the consumer's commit event.