Search code examples
spring-integrationspring-cloud-streamspring-cloud-dataflow

Spring Cloud App Starter, sftp source, recurse a directory for files


I am using SFTP Source in Spring cloud dataflow and it is working for getting files define in sftp:remote-dir:/home/someone/source , Now I have a many subfolders under the remote-dir and I want to recursively get all the files under this directory which match the patten. I am trying to use filename-regex: but so far it only works on one level. How do I recursively get the files I need.


Solution

  • The inbound channel adapter does not support recursion; use a custom source with the outbound gateway with an MGET command, with recursion (-R).

    The doc is missing that option; fixed in the current docs.

    I opened an issue to create a standard app starter.

    EDIT

    With the Java DSL...

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class So44710754Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So44710754Application.class, args);
        }
    
        // should store in Redis or similar for persistence
        private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
    
        @Bean
        public IntegrationFlow flow() {
            return IntegrationFlows.from(source(), e -> e.poller(Pollers.fixedDelay(30_000)))
                    .handle(gateway())
                            .split()
                            .<File>filter(p -> this.processed.putIfAbsent(p.getAbsolutePath(), true) == null)
                            .transform(Transformers.fileToByteArray())
                            .channel(Source.OUTPUT)
                            .get();
        }
    
        private MessageSource<String> source() {
            return () -> new GenericMessage<>("foo/*");
        }
    
        private AbstractRemoteFileOutboundGateway<LsEntry> gateway() {
            AbstractRemoteFileOutboundGateway<LsEntry> gateway = Sftp.outboundGateway(sessionFactory(), "mget", "payload")
                    .localDirectory(new File("/tmp/foo"))
                    .options(Option.RECURSIVE)
                    .get();
            gateway.setFileExistsMode(FileExistsMode.IGNORE);
            return gateway;
        }
    
        private SessionFactory<LsEntry> sessionFactory() {
            DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
            sf.setHost("10.0.0.3");
            sf.setUser("ftptest");
            sf.setPassword("ftptest");
            sf.setAllowUnknownKeys(true);
            return new CachingSessionFactory<>(sf);
        }
    
    }
    

    And with Java config...

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class So44710754Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So44710754Application.class, args);
        }
    
        @InboundChannelAdapter(channel = "sftpGate", poller = @Poller(fixedDelay = "30000"))
        public String remoteDir() {
            return "foo/*";
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpGate")
        public SftpOutboundGateway mgetGate() {
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sessionFactory(), "mget", "payload");
            sftpOutboundGateway.setOutputChannelName("splitterChannel");
            sftpOutboundGateway.setFileExistsMode(FileExistsMode.IGNORE);
            sftpOutboundGateway.setLocalDirectory(new File("/tmp/foo"));
            sftpOutboundGateway.setOptions("-R");
            return sftpOutboundGateway;
        }
    
        @Bean
        @Splitter(inputChannel = "splitterChannel")
        public DefaultMessageSplitter splitter() {
            DefaultMessageSplitter splitter = new DefaultMessageSplitter();
            splitter.setOutputChannelName("filterChannel");
            return splitter;
        }
    
        // should store in Redis, Zookeeper, or similar for persistence
        private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>();
    
        @Filter(inputChannel = "filterChannel", outputChannel = "toBytesChannel")
        public boolean filter(File payload) {
            return this.processed.putIfAbsent(payload.getAbsolutePath(), true) == null;
        }
    
        @Bean
        @Transformer(inputChannel = "toBytesChannel", outputChannel = Source.OUTPUT)
        public FileToByteArrayTransformer toBytes() {
            FileToByteArrayTransformer transformer = new FileToByteArrayTransformer();
            return transformer;
        }
    
        private SessionFactory<LsEntry> sessionFactory() {
            DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
            sf.setHost("10.0.0.3");
            sf.setUser("ftptest");
            sf.setPassword("ftptest");
            sf.setAllowUnknownKeys(true);
            return new CachingSessionFactory<>(sf);
        }
    
    }