Search code examples
springspring-integration

Spring integation parallel split


I have code like this. Is it possible to control ordering of first split?

` @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    return executor;
  }


  @Bean
  public IntegrationFlow firstFlow() {
    return IntegrationFlows.from("firstChannel")
        .split()
        .channel("inputChannel")
        .get();
  }
  
  @Bean
  public IntegrationFlow inputFlow() {
    return IntegrationFlows.from("inputChannel")
        .channel(MessageChannels.executor(taskExecutor()))
        .split()
        .handle(this::mapping)
        .aggregate()
        .channel("aggregateChannel")
        .get();
  }

  @Bean
  public IntegrationFlow aggregateFlow() {
    return IntegrationFlows.from("aggregateChannel")
        .aggregate()
        .get();
  }`

I want to have async handling of method "mapping", but to start handling second message from first split and sending to inputChannel only when first one will appear in aggregateChannel


Solution

  • So, here is a unit test for possible solution:

    @SpringJUnitConfig
    public class So75547720Tests {
    
        @Autowired
        BeanFactory beanFactory;
    
        @Test
        void sequentialSplitButSubSplitParallel() {
            List<String> firstList = List.of("1", "2", "3", "4");
            List<String> secondList = List.of("5", "6", "7", "8");
            List<List<String>> testData = List.of(firstList, secondList);
    
            MessagingTemplate messagingTemplate = new MessagingTemplate();
            messagingTemplate.setBeanFactory(this.beanFactory);
    
            List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
            assertThat(result).isNotNull().hasSize(2);
            assertThat(result.get(0)).hasSameElementsAs(firstList);
            assertThat(result.get(1)).hasSameElementsAs(secondList);
            System.out.println(result);
        }
    
        @Configuration
        @EnableIntegration
        public static class TestConfiguration {
    
            @Bean
            public TaskExecutor taskExecutor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                executor.setCorePoolSize(5);
                return executor;
            }
    
    
            @Bean
            public IntegrationFlow firstFlow() {
                return IntegrationFlow.from("firstChannel")
                        .split()
                        .channel("inputChannel")
                        .get();
            }
    
            @Bean
            public IntegrationFlow inputFlow() {
                return IntegrationFlow.from("inputChannel")
                        .gateway(subFlow -> subFlow
                                .split()
                                .channel(MessageChannels.executor(taskExecutor()))
                                .handle(this::mapping)
                                .aggregate())
                        .channel("aggregateChannel")
                        .get();
            }
    
            @Bean
            public IntegrationFlow aggregateFlow() {
                return IntegrationFlow.from("aggregateChannel")
                        .aggregate()
                        .get();
            }
    
            private String mapping(String payload, Map<String, ?> headers) {
                System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
                return payload.toUpperCase();
            }
    
        }
    
    }
    

    The first split() emits items sequentially into that inputChannel. Then we use a gateway for sub-flow. This gateway will wait for a reply to push it forward for the next aggregateChannel. The interesting part is indeed in that sub-flow where we use a second splitter which does emit items in parallel according to an Executor channel. An inner aggregator won't emit until it gathers all the items for current split. And only after that we go for the next item from a top level split.

    The result of the test might be like this:

    Handling thread: taskExecutor-2 for: 2
    Handling thread: taskExecutor-1 for: 1
    Handling thread: taskExecutor-3 for: 3
    Handling thread: taskExecutor-4 for: 4
    Handling thread: taskExecutor-2 for: 6
    Handling thread: taskExecutor-5 for: 5
    Handling thread: taskExecutor-3 for: 7
    Handling thread: taskExecutor-1 for: 8
    [[2, 3, 1, 4], [6, 5, 7, 8]]