Search code examples
rabbitmqspring-amqpspring-rabbit

RabbitMQ be sure message reaches a queue


I want to be sure the message is reaching a queue. Otherwise, I want an exception.

I have tried publisher returns, but it is not what I need, because it is on a different thread and I think it would be tricky to somehow wait for it on the thread sent the message.

Without the transacted channel, the convertAndSend method returned successfully when the exchange did not be there, with the transacted channel now it throws an exception.

What I need is the same when there is no route based on the routing key.

@SpringBootApplication
public class DemoApplication {

    private static final Logger log = Logger.getGlobal();

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(replyCode + "," + replyText));
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    CommandLineRunner commandLineRunner(RabbitTemplate rabbitTemplate) {
        return args -> {
            rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message");
            log.info("Send is done.");
        };
    }
}

only property: spring.rabbitmq.publisher-returns=true

Spring boot version: 2.1.7.RELEASE

Actual:

no exchange -> convertAndSend throws exception

no route at exchange -> method returns

Expected:

no exchange -> convertAndSend throws exception

no route at exchange -> convertAndSend throws exception


Solution

  • You need to use publisher confirms and correlation data:

    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.template.mandatory=true
    
    @SpringBootApplication
    public class So57464212Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57464212Application.class, args);
        }
    
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                System.err.println("Returned: " + replyText);
            });
            template.setConfirmCallback((correlationData, ack, cause) -> {
                System.err.println("ack:" + ack);
            });
            return args -> {
                CorrelationData correlationData = new CorrelationData("foo");
                template.convertAndSend("", "NOQUEUE", "bar", correlationData);
                correlationData.getFuture().get(10, TimeUnit.SECONDS);
                if (correlationData.getReturnedMessage() != null) {
                    throw new RuntimeException("Message was returned");
                }
            };
        }
    
    }