I am trying to implement a HTTP request/reply using separate RabbitMQ queues in Spring Integration DSL. It's similar to Spring IntegrationFlow http request to amqp queue. The difference is I want the response back to the original http caller. I could see the test http post message successfully passed to the request queue and transformed (into upper case) into the response queue. The message was consumed from the response queue as well but never returned back to the caller(http://localhost:8080/Tunner). Eventually the call timed out with 500 error. I am new to this so there could be something I totally missed. Could someone provide suggestion? The code is as follows:
public class TunnelApplication
{
public static void main(String[] args)
{
SpringApplication.run(TunnelApplication.class, args);
}
@Value("${outboundQueue}")
private String outboundQueue;
@Value("${inboundQueue}")
private String inboundQueue;
private ConnectionFactory rabbitConnectionFactory;
@Autowired
public TunnelApplication(ConnectionFactory factory) {
rabbitConnectionFactory = factory;
}
@Bean
public Queue targetQueue()
{
return new Queue(outboundQueue, true, false, true);
}
@Bean
public Queue requestQueue()
{
return new Queue(inboundQueue, true, false, true);
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
result.setMessageConverter(jsonMessageConverter());
result.setDefaultReceiveQueue(outboundQueue);
//result.setReplyAddress(outboundQueue);
result.setReplyTimeout(60000);
return result;
}
@Bean
public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows
.from(Http.inboundGateway("/tunnel"))
.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey(inboundQueue)
.returnChannel(amqpOutboundChannel()))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
.transform(String.class, String::toUpperCase)
.log()
.handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
.handle(Http.outboundGateway("http://localhost:8080/tunnel")
.expectedResponseType(String.class))
.log()
.bridge(null)
.channel(amqpOutboundChannel())
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
We have also tried the following code (by my coworker) and we didn't get the response either:
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${routingKey}")
private String routingKey;
@Value("${rabbitSinkChannel}")
private String rabbitSinkChannel;
@Bean
public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
return
Amqp
.channel(rabbitSinkChannel, connectionFactory)
.get();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(routingKey)
.returnChannel(rabbitSinkChannel)
)
.channel(rabbitSinkChannel) // or .handle? if so, what?
.get();
}
}
The following update works (I also removed the replyBackToHttp() method):
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//result.setDefaultReceiveQueue(outboundQueue);
rabbitTemplate.setReplyAddress(outboundQueue);
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(replyQueue());
container.setMessageListener((MessageListener) amqpTemplate());
return container;
}