Search code examples
spring-integration

Using releaseStrategy but flow doesn't continue


I am using the following main flow

    return flow -> flow.channel(REQUEST_INPUT)
            ...
            .enrich(this::calculationEnricher)
            .route(ifMLCallRequired(), routeToMLOrBypassCall())
            .enrich(this::subServiceRequestEnricher)
            ....

and the following subflow

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeToMLOrBypassCall() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(true, sf -> sf
                .enrichHeaders(he -> he.headerExpression("corrMLCalls", "T(java.util.UUID).randomUUID().toString()"))
                .scatterGather(rlr -> rlr.applySequence(true)
                                         .recipientFlow(f1 -> f1
                                                .channel(c -> c.executor(executorMLCalls))
                                                .route(ifService1NeedsToBeCalled(), routeToService1OrBypassCall()))
                                         .recipientFlow(f2 -> f2
                                                .channel(c -> c.executor(executorMLCalls))
                                                .enrich(this::service2RequestEnricher)
                                                .enrich(this::service2Enricher))
                                         .recipientFlow(f3 -> f3
                                                .channel(c -> c.executor(executorMLCalls))
                                                .enrich(this::service3RequestEnricher)
                                                .enrich(this::service3Enricher)),
                               agg -> agg.correlationStrategy(msg -> msg.getHeaders().get("corrMLCalls"))
                                         .releaseExpression("size() == 2"),
                               sgs -> sgs.gatherTimeout(gatherTimeout)
                                         .requiresReply(true)
                )
                .handle(...
                )
                .defaultOutputToParentFlow();
}

The service2 and service3 are called always but the service1 depends on some condition. If the service 1 is not called (i.e. size() == 2) then I am experiencing a weird phenomena (well, at least it is strange to me...): after the routeToMLOrBypassCall() the subServiceRequestEnricher should be called but it is not. Only if I change the release condition to size() == 3. I presume it is related to the fact that I created 3 channels for each service and it expects something from each service?

The routeToService1OrBypassCall() looks like

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeToService1OrBypassCall()() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(false, sf -> sf.enrich(this::service1RequestEnricher)
                                           .enrich(this::service1Enricher)
            )
            .defaultOutputToParentFlow();
}

Should I add something to the true branch too...?

I appreciate any help! Thank you!

Regards, V.


Solution

  • Your logic is too complicated and it is hard to understand what is going on with such a sub-flows configuration.

    Probably you need to think about replacing your router logic to the simple .filter(). That one can produce a reply on true and won't do anything on false.

    The router has its drawback that it doesn't produce to the reply channel as is and it just ends up in the sub-flow it routes to.