Search code examples
spring-integrationspring-integration-dsl

Run multiple Spring Integration Flows in parallel


I have application that polls multiple directories and than it sends job requests to Sring Batch, every directory is registered as different Flow. Is it possible to run this in parallel? I have this use case, because every directory is connected to different business entity, and when flow is stuck with malformed file or mq broker for particular entity is not present, others need to continue working.
I registered flows with IntegrationFlowContext.

@Configuration
@RequiredArgsConstructor
@Slf4j
public class IntegrationConfigSO implements CommandLineRunner {
    
    private final HalFileAdapterConfig config;
    private final JobRepository jobRepository;
    private final BatchJobs batchJobs;
    private final ApplicationIntegrationEventPublisher eventPublisher;
    private final IntegrationFlowContext flowContext;
    
    @Override
    public void run(String... args) throws Exception {
        registerFlows();
    }
    
    public void registerFlows() {
        Arrays.stream(config.getSystemsEnabled())
                .map(this::flow)
                .forEach(flow -> flowContext.registration(flow)
                        .id(UUID.randomUUID().toString())
                        .useFlowIdAsPrefix()
                        .register()
                );
        
    }
    
    public IntegrationFlow flow(String systemId) {
        return IntegrationFlows
                .from(
                        fileReadingMessageSource(systemId),
                        c -> c.poller(Pollers.fixedDelay(config.getPollTimeSeconds(), TimeUnit.SECONDS)
                                .maxMessagesPerPoll(config.getMaxFilesPerPoll())))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .channel("jobReplyChannel")
                .get();
    }
    
    
    public MessageSource<File> fileReadingMessageSource(String systemId) {
        FileReadingMessageSource source = new FileReadingMessageSource(getCustomFileComparator());
        source.setAutoCreateDirectory(true);
        source.setDirectory(new File(config.getBaseDirectory() + File.separatorChar + systemId));
        source.setScanner(directoryScanner());
        return source;
    }
    
    @Bean
    public DirectoryScanner directoryScanner() {
        CustomRecursiveDirScanner scanner = new CustomRecursiveDirScanner(config);
        CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
        filters.addFilter(new AcceptOnceFileListFilter<>());
        scanner.setFilter(filters);
        return scanner;
    }
    
    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(config, eventPublisher);
        fileMessageToJobRequest.setJob(batchJobs.job());
        return fileMessageToJobRequest;
    }
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        jobLaunchingGateway.setOutputChannel(jobReplyChannel());
        return jobLaunchingGateway;
    }
    
    @Bean
    public MessageChannel jobReplyChannel() {
        return new DirectChannel();
    }
    
}

Solution

  • Yes. It is valid, possible and working use-case. The poller in Spring Integration relies on the TaskScheduler and its thread pool. So, to be sure that all your parallel flows work, you need to make that thread pool big enough.

    See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler

    There is also a spring.integration.taskScheduler.poolSize global integration property. (Next section in that doc).

    If you use Spring Boot, see the TaskScheduler auto-configuration: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.task-execution-and-scheduling