Search code examples
javaspringspring-integrationspring-integration-sftp

Spring Integration sftp new file not polled till message handler goes through the list of all files cached on local


    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpHost);
        factory.setPort(sftpPort);
        factory.setUser(sftpUser);
//      if (sftpPrivateKey != null) {
//          factory.setPrivateKey(sftpPrivateKey);
//          factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
//      }
        factory.setPassword(sftpPassword);
        factory.setAllowUnknownKeys(true);

        return new CachingSessionFactory<>(factory);
    }
    

SftpInboundConfig.java

package com.shail.sftp.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;


import org.springframework.integration.file.filters.AcceptOnceFileListFilter;

import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;

import org.springframework.messaging.MessageHandler;

import java.io.File;

@Configuration
public class SftpInboundConfig {

    @Autowired
    private SessionFactory session;
    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(session);
        synchronizer.setDeleteRemoteFiles(true); 
        synchronizer.setRemoteDirectory("/home/shail/sftp_dir/patient/");
        synchronizer.setFilter(new SftpSimplePatternFileListFilter("*.*")); // File filter pattern
        return synchronizer;
    }


    @Bean
    @InboundChannelAdapter(value = "sftpChannel", poller = @Poller(fixedRate = "60000")) // Poll every 60 seconds
    public MessageSource<File> sftpInboundAdapter() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("src/main/resources/localDownload/patient/"));
        source.setAutoCreateLocalDirectory(true);
        //source.setLocalFilter(new AcceptOnceFileListFilter<>());
        source.setMaxFetchSize(-1); // Set to a higher value if you want to fetch multiple files at a time
        return source;
    }



    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler sftpMessageHandler() {
        return message -> {
            // Handle the downloaded file (e.g., process or save it)
            File file = (File) message.getPayload();
            System.out.println("Received file: " + file.getName() + "  @  " + LocalDateTime.now());
        };
    }
}

When the application is started, during the first poll all the files are getting downloaded in the local directory; but the message handler is receiving one file in each Poll i.e, in console at every 1 minute i see the log Received file: Product_1_20231107162209.json @ 2023-11-07T17:01:16.124739600 .
And in between if new files are added in the sftp server, then the service (code) does not downloads new files till the MessageHandler has gone through the list of all files downloaded in the initial caching.

Please see the below screenshot of logs and file explorer for more details on timing behavior

FileExplorer
enter image description here

Console Log
enter image description here

What I am trying to do:

I want to implement the inbound adapter so that in each polling all the on the sftp server should be downloaded to local directory, meaning when the first polling happened at 17:12, the first 5 files should have been downloaded and at 17:14 when the second set of files were added on sftp server, they all should have been downloaded at 17:15 instead of waiting till 17:17 for MessageHandler to go through the list of files received at 17:12.Could you please suggest how to handle this issue?

What I have tried:

I tried increasing decreasing the maxFetchSize and also tried removing the AcceptOnceFileListFilter but this didn't helped.


Solution

  • The @Poller for the @InboundChannelAdapter polls one message per task. Consider to configure it like maxMessagesPerPoll = "-1". This way all the available files are going to be processed in a single polling task. Then, since there is no local files, it is going to request for more from SFTP on the next polling cycle. In your case at about after a minute since fixedRate means to start a new polling task just after this one has started.