Search code examples
spring-integrationspring-integration-dsl

Is there a way to fork the Spring IntegrationFlow using DSL?


I want do something like this where the gateway payload is a String and serviceA & serviceB both return lists.

    final IntegrationFlow flowA = flow -> flow
            .handle(serviceA) 
            .handle((payload, headers) -> payload); // List<Object>

    final IntegrationFlow flowB = flow -> flow
            .handle(serviceB) 
            .handle((payload, headers) -> payload); // List<Object>


    return IntegrationFlows
            .from(myGateway) // String payload
            .forkAndMerge(flowA, flowB, executor)
            .handle((payload, headers) -> payload)
            .get();

Is it possible to fork the flow into two and then gather up the results? Most examples of splitter & aggregators involve splitting up a list.


Solution

  • See the .scatterGather() variants.

    Main docs for the ScatterGatherer here.

    EDIT

    Example:

    @SpringBootApplication
    public class So63605348Application {
    
        private static final Logger log = LoggerFactory.getLogger(So63605348Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So63605348Application.class, args);
        }
    
        @Bean
        IntegrationFlow flow(TaskExecutor exec) {
            return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                    .scatterGather(s -> s.applySequence(true)
                            .recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
                                    .<String>handle((p, h) -> {
                                        log.info(p.toString());
                                        return p.toUpperCase();
                                    }))
                            .recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
                                    .<String>handle((p, h) -> {
                                        log.info(p.toString());
                                        return p + p;
                                    })))
                    .handle(System.out::println)
                    .get();
        }
    
        @Bean
        public TaskExecutor exec() {
            ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
            exec.setCorePoolSize(2);
            return exec;
        }
    
    }
    

    Result

    2020-08-26 17:33:56.769  INFO 50829 --- [         exec-1] com.example.demo.So63605348Application   : foo
    2020-08-26 17:33:56.769  INFO 50829 --- [         exec-2] com.example.demo.So63605348Application   : foo
    GenericMessage [payload=[foofoo, FOO], headers=...
    

    EDIT2

    If you prefer not to nest the subflows, you can factor them out...

    @Bean
    IntegrationFlow flow(TaskExecutor exec) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .scatterGather(s -> s.applySequence(true)
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()))
                .handle(System.out::println)
                .get();
    }
    
    @Bean
    IntegrationFlow flow2() {
        return f -> f
            .<String>handle((p, h) -> {
                log.info(p.toString());
                return p + p;
            });
    }
    
    @Bean
    IntegrationFlow flow3() {
        return f -> f
            .<String>handle((p, h) -> {
                log.info(p.toString());
                return p.toUpperCase();
            });
    }
    

    Or you can use the pub/sub channel variant...

    @Bean
    IntegrationFlow flow() {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .scatterGather(pubSub())
                .handle(System.out::println)
                .get();
    }
    
    @Bean
    PublishSubscribeChannel  pubSub() {
        PublishSubscribeChannel pubSub = new PublishSubscribeChannel(exec());
        pubSub.setApplySequence(true);
        return pubSub;
    }
    
    @Bean
    IntegrationFlow flow2() {
        return IntegrationFlows.from("pubSub")
            .<String>handle((p, h) -> {
                log.info(p.toString());
                return p + p;
            })
            .get();
    }
    
    @Bean
    IntegrationFlow flow3() {
        return IntegrationFlows.from("pubSub")
            .<String>handle((p, h) -> {
                log.info(p.toString());
                return p.toUpperCase();
            })
            .get();
    }