Search code examples
spring-bootspring-amqpspring-rabbit

Spring AMQP - Publisher confirms returns ack-ed when publish to non-existent queue


If I use publisher confirms and returns callback on RabbitMQ, I never get the returned message. From Spring AMQP docs :

- Publish to an exchange but there is no matching destination queue.
- Publish to a non-existent exchange.
The first case is covered by publisher returns, as described in Publisher Confirms and Returns.

So I think I will get returned message if I publish to exist exchange, but not-exist queue. But the return callback never called. Do I need to set something else?
I'm using RabbitMQ 3.8.0 and Spring Boot 2.2.1

application.yml

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

Producer

@Service
public class PublisherConfirmProducer {

    private static final Logger log = LoggerFactory.getLogger(PublisherConfirmProducer.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void postConstruct() {
        this.rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
            if (correlation != null) {
                log.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
            }
        });

        this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("Returned: " + message + "\nreplyCode: " + replyCode + "\nreplyText: " + replyText
                    + "\nexchange/rk: " + exchange + "/" + routingKey);
        });
    }

    // Careful : will be silently dropped, since the exchange is exists, but no
    // route to queue, but ack-ed. How to know that I publish to non-existing queue?
    public void sendMessage_ValidExchange_InvalidQueue(DummyMessage message) {
        CorrelationData correlationData = new CorrelationData("Correlation for message " + message.getContent());
        this.rabbitTemplate.convertAndSend("x.test", "not-valid-routing-key", message, correlationData);
    }

}

Main app

@SpringBootApplication
public class RabbitmqProducerTwoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerTwoApplication.class, args);
    }

    @Autowired
    private PublisherConfirmProducer producer;

    @Override
    public void run(String... args) throws Exception {
        var dummyMessage_2 = new DummyMessage("Message 2", 2);
        producer.sendMessage_ValidExchange_InvalidQueue(dummyMessage_2);
    }

}

Log result

2019-11-29 04:45:23.796  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : Starting RabbitmqProducerTwoApplication on timpamungkas with PID 8352 (D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two\bin\main started by USER in D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two)
2019-11-29 04:45:23.800  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : No active profile set, falling back to default profiles: default
2019-11-29 04:45:24.952  INFO 8352 --- [           main] c.c.r.RabbitmqProducerTwoApplication     : Started RabbitmqProducerTwoApplication in 1.696 seconds (JVM running for 3.539)
2019-11-29 04:45:24.990  INFO 8352 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2019-11-29 04:45:25.024  INFO 8352 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#599f571f:0/SimpleConnection@86733 [delegate=amqp://[email protected]:5672/, localPort= 50688]
2019-11-29 04:45:25.058  INFO 8352 --- [nectionFactory1] c.c.r.producer.PublisherConfirmProducer  : Received  ack for correlation: CorrelationData [id=Correlation for message Message 2]

Edited for Gary

RabbitMqConfig.java

@Configuration
public class RabbitmqConfig {

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(converter);
        return template;
    }

}

Solution

  • You will receive a positive ack after a returned message.

    What you have looks correct to me. It is very similar to this sample, so should work the same way.

    What type of exchange is x.test? What queues are bound to it, and with what routing keys?

    If you still can't get it to work after looking at that sample, post the project someplace and I'll take a look.