Search code examples
spring-bootspring-cloud-streamspring-amqpspring-rabbit

Spring AMQP - How to confirm that a message is delivered and Routed successfully?


I am looking for a way to delivery a message, and once the message is delivered (and routed) successfully, i need to perform some operations.

I have enabled publisher confirms and returns by:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

I have configured return and confirm callback on the rabbit template:

rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  System.out.println("Message returned");
});
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  System.out.println("confirm"); //correlationData.returnedMessage has the original message
});

Here is my publish code:

CorrelationData crd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("X-ORDERS", "ORDER_PLACED", request, crd);

crd.getFuture().addCallback(new ListenableFutureCallback<Confirm>() {
  @Override
  public void onFailure(Throwable throwable) {
    log.info("Failure received");
  }

  @Override
  public void onSuccess(Confirm confirm) {
    if(confirm.isAck()){
    log.info("Success received");
    doSomethingAfterSuccess();
  }}
});

Now, when i publish a message that is unable to route the message :-

  1. rabbitTemplate's returnCallBack AND confirmCallBack are also being
    called

  2. the onSuccess(..) of the correlationData is still called with isAck() = true

So, how can I check if the message is delivered successfully and routed?

EDIT: Found solution. The publish code :

CorrelationData crd = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("X-ORDERS", "ORDER_PLACED", request, crd);

    crd.getFuture().addCallback(new ListenableFutureCallback<Confirm>() {
      @Override
      public void onFailure(Throwable throwable) {
        log.info("Failure received");
      }

      @Override
      public void onSuccess(Confirm confirm) {
        if(confirm.isAck() && crd.getReturnedMessage == null){
        log.info("Success received");
        doSomethingAfterSuccess();
      }}
    });

basically changed the condition in onSuccess to "confirm.isAck() && crd.getReturnedMessage == null"


Solution

  • That is per the RabbitMQ documentation - you still get a positive ack, but it is guaranteed to be delievered after the return.

    So simply check that the future.returnedMessage is not null in onSuccess().

    See the documentation.

    In addition, when both confirms and returns are enabled, the CorrelationData is populated with the returned message. It is guaranteed that this occurs before the future is set with the ack.