I have code like this. Is it possible to control ordering of first split?
` @Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlows.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlows.from("inputChannel")
.channel(MessageChannels.executor(taskExecutor()))
.split()
.handle(this::mapping)
.aggregate()
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregateChannel")
.aggregate()
.get();
}`
I want to have async handling of method "mapping", but to start handling second message from first split and sending to inputChannel only when first one will appear in aggregateChannel
So, here is a unit test for possible solution:
@SpringJUnitConfig
public class So75547720Tests {
@Autowired
BeanFactory beanFactory;
@Test
void sequentialSplitButSubSplitParallel() {
List<String> firstList = List.of("1", "2", "3", "4");
List<String> secondList = List.of("5", "6", "7", "8");
List<List<String>> testData = List.of(firstList, secondList);
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(this.beanFactory);
List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
assertThat(result).isNotNull().hasSize(2);
assertThat(result.get(0)).hasSameElementsAs(firstList);
assertThat(result.get(1)).hasSameElementsAs(secondList);
System.out.println(result);
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlow.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlow.from("inputChannel")
.gateway(subFlow -> subFlow
.split()
.channel(MessageChannels.executor(taskExecutor()))
.handle(this::mapping)
.aggregate())
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlow.from("aggregateChannel")
.aggregate()
.get();
}
private String mapping(String payload, Map<String, ?> headers) {
System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
return payload.toUpperCase();
}
}
}
The first split()
emits items sequentially into that inputChannel
.
Then we use a gateway
for sub-flow. This gateway will wait for a reply to push it forward for the next aggregateChannel
. The interesting part is indeed in that sub-flow where we use a second splitter which does emit items in parallel according to an Executor
channel. An inner aggregator won't emit until it gathers all the items for current split. And only after that we go for the next item from a top level split.
The result of the test might be like this:
Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]