Search code examples
javaspring-bootspring-integrationspring-integration-dslspring-integration-http

How to call subflows after aggregate() method in scatter-gather pattern in Spring Integration


Here I'm using scatter gather pattern. If I want to call another IntegrationFlow after aggregate() and before to(), how do I do that? can I use recipientFlow here so that I can make that flow conditional as well?

     @Bean
          public IntegrationFlow flow() {
            return flow ->
                flow.handle(validatorService, "validateRequest")
                    .split()
                    .channel(c -> c.executor(Executors.newCachedThreadPool()))
                    .scatterGather(
                        scatterer ->
                            scatterer
                                .applySequence(true)
                                .recipientFlow(flow1())
                                .recipientFlow(flow2())
                                .recipientFlow(flow3()),
                        gatherer ->
                            gatherer
                                .releaseLockBeforeSend(true)
                                .releaseStrategy(group -> group.size() == 2))
                    .aggregate(lionService.someMethod())
    // here I want to call other Integration flows
                    .gateway(someFlow())
                    .to(someFlow2());
          }

 @Bean
  public IntegrationFlow flow1() {
    return flow ->
        flow.channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(h -> h.errorChannel("flow1ErrorChannel", true))
            .handle(cdRequestService, "prepareCDRequestFromLoanRequest");
  }
//same way I have flow2 and flow3, and I have set an custom error channel header for all the flows
 @Bean
  public IntegrationFlow someFlow() {
    return flow ->
        flow.filter("headers.sourceSystemCode.equals("001")").channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(h -> h.errorChannel("someFlow1ErrorChannel", true))
            .handle( Http.outboundGateway("http://localhost:4444/test2")
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class)).bridge();
  }

Till now whenever any error occurred in any of the flow it goes through the custom error channels that have been assigned to them then I process the error but when I have used someFlow1() in .gateway(someFlow()) then the error occurring in that flow is not going to the assigned error channel. How to resolve that?

Inside errorhandler class I'm doing something like below --

//errorhandlerclass

 @ServiceActivator(inputChannel = "flow1ErrorChannel")
  public Message<?> processDBError(MessagingException payload) {
    logger.atSevere().withStackTrace(StackSize.FULL).withCause(payload).log(
        Objects.requireNonNull(payload.getFailedMessage()).toString());
    MessageHeaders messageHeaders = Objects.requireNonNull(payload.getFailedMessage()).getHeaders();
    return MessageBuilder.withPayload(
            new LionException(ErrorCode.DATABASE_ERROR.getErrorData()))
        .setHeader(MessageHeaders.REPLY_CHANNEL, messageHeaders.get("originalErrorChannel"))
        .build();
  }

 @ServiceActivator(inputChannel = "someFlow1ErrorChannel")
  public Message<?> processDBError(MessagingException payload) {
    logger.atSevere().withStackTrace(StackSize.FULL).withCause(payload).log(
        Objects.requireNonNull(payload.getFailedMessage()).toString());
    MessageHeaders messageHeaders = Objects.requireNonNull(payload.getFailedMessage()).getHeaders();
    return MessageBuilder.withPayload(
            new LionException(ErrorCode.CUSTOM_ERROR.getErrorData()))
        .setHeader(MessageHeaders.REPLY_CHANNEL, messageHeaders.get("originalErrorChannel"))
        .build();
  }

Again, if there's any error in someFlow() then error is shown but I want it to go to that method where I'm processing the error as per my requirement.

Also, you can see I've used filter in someFlow() so when the filter expression evaluates true then no problem but when it become false then it's throwing error but I want it to escape and go to next i.e.,.to(someFlow2()). I've used .bridge() by thinking that it'll return to previous context but that's not happening. I know there's some gap in my understanding. Kindly help with the above two problems.


Solution

  • To call another flow and come back to the main one you can use a gateway(). But that flow has to return in the end. There is no something like conditional flow: you may send to the channel (next endpoint in the flow) or not via filter() endpoint (or operator if you wish). The to() operator is terminal in the current flow, but you continue your logic in that destination flow whatever you want. Looks like you need to dedicate some of your time to understand what is a message channel and how it connects endpoints in Spring Integration. The IntegrationFlow is just logical container to express a business task - at runtime it is all endpoints and channels between them.