Search code examples
rabbitmqamqpspring-integration

Durable RPC with Spring Integration and RabbitMQ


I have created a simple message sender and consumer and linked them together using spring-integration and RabbitMq.

I have used the outbound-gateway and inbound-gateway in order to create RPC (request / response) style messaging. I have not specified a reply-channel as I am leaving this up to spring to create (I am assuming it will create an anonymous queue for the reply).

This works just fine when the consumer is started before the sender, but I am not receiving a response if the consumer is started after the sender. I can see that the consumer receives the message but no response is returned.

On further inspection, I can see that a temporary anonymous queue is created in rabbit when the sender sends the message, and the message contains this queue name in the reply-to header. This queue, however, disappears shortly after it is created, before I start up the consumer. I'm guessing that as the queue no longer exists, the consumer cannot publish a response to it.

I can see from the rabbit admin tool that the anonymous queue is set as exclusive, and auto-delete is set to true. I have no control over these properties though as the queue is being created by spring-integration.

Does anyone have any ideas of how to solve this? My config is as follows:

Sender:

<import resource="classpath:rabbit.xml" />

<int:channel id="output" />

<int:gateway id="senderGateway" service-interface="gordon.outbound.SenderGateway" default-request-channel="output"/>

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="silly-wabbit-exchange"
                                   routing-key="silly-wabbit-key"/>

Consumer:

<import resource="classpath:rabbit.xml" />

<int:channel id="input"/>

<int-amqp:inbound-gateway request-channel="input" queue-names="silly-wabbit-queue" connection-factory="connectionFactory"/>

<bean id="listenerService" class="gordon.inbound.ListenerService"/>

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

Solution

  • The default replyTimeout is 5 seconds. If the consumer doesn't reply in 5 seconds, the outbound gateway's consumer is canceled, removing the temporary queue.

    You can increase the timeout by configuring a reply-timeout on the <rabbit:template/> (milliseconds).