Search code examples
spring-integration-dsl

Spring-Integration-DSL: Nested Scatter Gather hangs


Here is a broken, but executable example code:

@Bean
IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("test")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(flow -> flow
                                    .scatterGather(
                                            scatterer1 -> scatterer1
                                                    .applySequence(true)
                                                    .recipientFlow(IntegrationFlowDefinition::bridge),
                                            gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                    .log(INFO, m -> "THIS HAPPENS")),
                    gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
            .log(INFO, m -> "THIS NEVER HAPPENS")
            .get();
}

The expected output is:

THIS HAPPENS
THIS NEVER HAPPENS

The actual output is:

THIS HAPPENS

I found this identical looking issue on Github, but it claims it has been fixed with versions 5.1.10 and 5.2.4. I am running spring-boot-starter-integration 5.5.0 which includes spring-integration-core of the same version.

What can I do to get this nested scatter gather to work ? Is it a bug with the DSL or with my code ?


Solution

  • After having a similar problem using only one level of scatter-gather, I realized it was the log message that was blocking the output from being returned to the parent flow. Replace .log() with .logAndReply() or .log().bridge() and everything should work again.

    Like this:

    @Bean
    IntegrationFlow testFlow() {
        return IntegrationFlows
                .from(Http.inboundChannelAdapter("test")
                        .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                        .get())
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow -> flow
                                        .scatterGather(
                                                scatterer1 -> scatterer1
                                                        .applySequence(true)
                                                        .recipientFlow(IntegrationFlowDefinition::bridge),
                                                gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                        .logAndReply(INFO, m -> "THIS HAPPENS")), // this fixes the problem
                        gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                .log(INFO, m -> "THIS NEVER HAPPENS")
                .get();
    }