Search code examples
spring-integrationspring-integration-dslspring-integration-sftp

Run Spring Integration flow concurrently for each Ftp file


I have a Integration flow configured using Java DSL which pulls file from Ftp server using Ftp.inboundChannelAdapter then transforms it to JobRequest, then I have a .handle() method which triggers my batch job, everything is working as per required but the process in running sequentially for each file inside the FTP folder

I added currentThreadName in my Transformer Endpoint it was printing same thread name for each file

Here is what I have tried till now

1.task executor bean

 @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("Integration");

    }

2.Integration flow

  @Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
    return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
                    .remoteDirectory("/bar")
                    .localDirectory(localDir.getFile())
            ,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
            .transform(fileMessageToJobRequest(importUserJob(step1())))
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
            .route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
            .get();
}

3.I also read in another SO thread that I need ExecutorChannel so I configured one but I don't know how to inject this channel into my Ftp.inboundAdapter, from logs is see that the channel is always integrationFlow.channel#0 which I guess is a DirectChannel

 @Bean
public MessageChannel inputChannel() {
    return new ExecutorChannel(taskExecutor());
}

I dont know what I'm missing here, or I might have not properly understood Spring Messaging System as I'm very much new to Spring and Spring-Integration

Any help is appreciated

Thanks


Solution

  • The ExecutorChannel you can simply inject into the flow and it is going to be applied to the SourcePollingChannelAdapter by the framework. So, having that inputChannel defined as a bean you just do this:

    .channel(inputChannel())
    

    before your .transform(fileMessageToJobRequest(importUserJob(step1()))). See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

    On the other hand to process your files in parallel according your .taskExecutor(taskExecutor()) configuration, you just need to have a .maxMessagesPerPoll(20) as 1. The logic in the AbstractPollingEndpoint is like this:

    this.taskExecutor.execute(() -> {
                    int count = 0;
                    while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }
    

    So, we do have tasks in parallel, but only when they reach that maxMessagesPerPoll where it is 20 in your current case. There is also some explanation in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

    The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.