I have an integration flow that does a scatter gather operation which hits multiple HTTP endpoints returning JSON. Then aggregates the result into a single JSON object. The flow goes like this
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}
I'm starting the flow using a gateway declared as follows
@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}
The problem I have is that the whole flow blocks and the gateway times-out. I've put breakpoints in the two service calls IMyService::handleAggregatedJson and IMyOutherService::handleMyServiceResult and they are both running, but the output never reaches the reply channel of the gateway. If I remove both of the last two handle operations then the flow returns a result normally via the gateway.
I've looked into the stack-trace while the flow is blocked and I can see that the thread running the flow is waiting on a lock
java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy116.startFlow(Unknown Source:-1)
From what I suspect if the flow takes more than X time then it will block. I tried putting a rendezvous channel between the flow and the gateway but it didn't seem to work.
Any ideas on what's causing the timeout problem?
Addendum: I've been fiddling a bit with the code and removing the return type on the Gateway and the last .channel call on the flow does seem to stop blocking it.
The following works fine
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}
I wonder if all your
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
after gathering return some value. Typical error with request-reply that some step in the flow stops to reply with some reasonable value.
UPDATE
You should consider to remove explicit replyChannel
declaration from the @Gateway
definition and also remove .channel("myFlow.output")
from the end of flow. This way you should get a reply to the replyChannel
header. When you configure explicit replyChannel
, there is no guarantee that you won't have some other subscriber to this channel which is going to "steal" your reply messages.
See more info in the Reference Manual.