Search code examples
javarabbitmqspring-amqpspring-rabbit

RabbitMQ Spring Java - RPC Pattern


I have some Spring applications that communicate between them using RabbitMQ as broker. I can send and receive messages asynchronously between them. But now, I need one application to send a message to another one and wait for the response. So, for this I am trying to implement the RPC pattern. It is working, but the problem is that I could only do it using temporary queues generated by Spring.

rpc pattern

https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html

This is the code that sends the message and wait for the response.

public void send() {
    ....
    Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", "message");
    ...
}

When I send the message, the execution is blocked until the response is received and a temporary queue is created by Spring for the response, as expected.

But what I need is to use a specific and fixed queue, defined by me, to receive the responses. I need responses to be sent to an exchange with a routing key pointing to the fixed response queue (doing this I'll be able to send the responses to another queue too, that will be logging all responses).

I tried setting the "setReplyTo" property to the message, but is not working.


Solution

  • What version are you using? With modern versions, direct reply_to is used by default, but you can revert to using a temporary queue by setting a property on the template.

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to

    To use a named reply queue, see the documentation about how to set up a reply container, with the template as the message listener:

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#message-correlation-with-a-reply-queue

    and

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#reply-listener

    EDIT

    The template will block until the corresponding reply is passed into it by the reply container (or it times out).

    @SpringBootApplication
    public class So68986604Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So68986604Application.class, args);
        }
    
        @RabbitListener(queues = "foo")
        public String listen(String in) {
            System.out.println(in);
            return in.toUpperCase();
        }
    
        @Bean
        Queue foo() {
            return new Queue("foo");
        }
    
        @Bean
        Queue replies() {
            return new Queue("foo.replies");
        }
    
        @Bean
        SimpleMessageListenerContainer replyContainer(ConnectionFactory cf, RabbitTemplate template) {
            SimpleMessageListenerContainer replyer = new SimpleMessageListenerContainer(cf);
            replyer.setQueueNames("foo.replies");
            replyer.setMessageListener(template);
            template.setReplyAddress("foo.replies");
            return replyer;
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                System.out.println(template.convertSendAndReceive("", "foo", "test"));
            };
        }
    }
    
    test
    TEST