I have some Spring applications that communicate between them using RabbitMQ as broker. I can send and receive messages asynchronously between them. But now, I need one application to send a message to another one and wait for the response. So, for this I am trying to implement the RPC pattern. It is working, but the problem is that I could only do it using temporary queues generated by Spring.
https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html
This is the code that sends the message and wait for the response.
public void send() {
....
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", "message");
...
}
When I send the message, the execution is blocked until the response is received and a temporary queue is created by Spring for the response, as expected.
But what I need is to use a specific and fixed queue, defined by me, to receive the responses. I need responses to be sent to an exchange with a routing key pointing to the fixed response queue (doing this I'll be able to send the responses to another queue too, that will be logging all responses).
I tried setting the "setReplyTo" property to the message, but is not working.
What version are you using? With modern versions, direct reply_to is used by default, but you can revert to using a temporary queue by setting a property on the template.
https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to
To use a named reply queue, see the documentation about how to set up a reply container, with the template as the message listener:
and
https://docs.spring.io/spring-amqp/docs/current/reference/html/#reply-listener
EDIT
The template will block until the corresponding reply is passed into it by the reply container (or it times out).
@SpringBootApplication
public class So68986604Application {
public static void main(String[] args) {
SpringApplication.run(So68986604Application.class, args);
}
@RabbitListener(queues = "foo")
public String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
@Bean
Queue foo() {
return new Queue("foo");
}
@Bean
Queue replies() {
return new Queue("foo.replies");
}
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory cf, RabbitTemplate template) {
SimpleMessageListenerContainer replyer = new SimpleMessageListenerContainer(cf);
replyer.setQueueNames("foo.replies");
replyer.setMessageListener(template);
template.setReplyAddress("foo.replies");
return replyer;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
System.out.println(template.convertSendAndReceive("", "foo", "test"));
};
}
}
test
TEST