Search code examples
javaspringspring-bootspring-integrationspring-integration-dsl

Spring Integration DSL ScatterGather flow blocks


I have an integration flow that does a scatter gather operation which hits multiple HTTP endpoints returning JSON. Then aggregates the result into a single JSON object. The flow goes like this

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .channel("myFlow.output");
}

I'm starting the flow using a gateway declared as follows

@MessagingGateway
public interface IMyGateway {

    @Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
    MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);

}

The problem I have is that the whole flow blocks and the gateway times-out. I've put breakpoints in the two service calls IMyService::handleAggregatedJson and IMyOutherService::handleMyServiceResult and they are both running, but the output never reaches the reply channel of the gateway. If I remove both of the last two handle operations then the flow returns a result normally via the gateway.

I've looked into the stack-trace while the flow is blocked and I can see that the thread running the flow is waiting on a lock

java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy116.startFlow(Unknown Source:-1)

From what I suspect if the flow takes more than X time then it will block. I tried putting a rendezvous channel between the flow and the gateway but it didn't seem to work.

Any ideas on what's causing the timeout problem?

Addendum: I've been fiddling a bit with the code and removing the return type on the Gateway and the last .channel call on the flow does seem to stop blocking it.

The following works fine

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .handle(m -> {
                log.info("Flow completed successfully, payload as expected:" + payload);
            });
}

Solution

  • I wonder if all your

    .handle(myService, "handleAggregatedJson")
    .handle(HeaderPrinter::headerPrinter)
    .handle(myOtherService, "handleMyServiceOutput")
    

    after gathering return some value. Typical error with request-reply that some step in the flow stops to reply with some reasonable value.

    UPDATE

    You should consider to remove explicit replyChannel declaration from the @Gateway definition and also remove .channel("myFlow.output") from the end of flow. This way you should get a reply to the replyChannel header. When you configure explicit replyChannel, there is no guarantee that you won't have some other subscriber to this channel which is going to "steal" your reply messages.

    See more info in the Reference Manual.