Search code examples
spring-integrationspring-integration-dslspring-integration-http

Spring integration DSL Scatter-Gather async/parallel execution for multiple recipientFlows


we are trying to make parallel calls to different recipient using scatter-gather and it works fine. But the second recipient flow is not starting unless the first one is complete(traced in Zipkin). is there is a way to make all recipients async.. very similar to split-aggregate with executor channel.

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

flow2(),flow3(),flow4() methods are methods with InterationFlow as return type.

sample code flow2() :

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }

Solution

  • This is indeed possible with the mentioned executor channel. All you recipient flows must really start from the ExecutorChannel. In your case you have to modify all of them to something like this:

    public IntegrationFlow flow2() {
        return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele))
                .get();
    }
    

    Pay attention to the IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). That's exactly how you can make each sub-flow async.

    UPDATE

    For the older Spring Integration version without IntegrationFlow improvement for the sub-flows we can do like this:

    public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .channel(c -> c.executor(Executors.newCachedThreadPool()))
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }
    

    This is similar to what you show in the comment above.