Search code examples
springspring-integrationspring-integration-sftp

Spring SFTP Outbound Adapter - determining when files have been sent


I have a Spring SFTP output adapter that I start via "adapter.start()" in my main program. Once started, the adapter transfers and uploads all the files in the specified directory as expected. But I want to stop the adapter after all the files have been transferred. How do I detect if all the files have been transferred so I can issue an adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

Solution

  • @Artem Bilan has already given the answer. But here's kind of a concrete implementation of what he said - for those who are a Spring Integration noob like me:

    1. Define a service to get the PDF files on demand:
    @Service
    public class MyFileService {
        public List<File> getPdfFiles(final String srcDir) {
            File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
            return Arrays.asList(files == null ? new File[]{} : files);
        }
    }
    
    1. Define a Gateway to start the SFTP upload flow on demand:
    @MessagingGateway
    public interface SFtpOutboundGateway {
        @Gateway(requestChannel = "sftpOutboundFlow.input")
        void uploadFiles(List<File> files);
    }
    
    1. Define the Integration Flow to upload the files to the SFTP server via Sftp.outboundGateway:
    @Configuration
    @EnableIntegration
    public class FtpFlowIntegrationConfig {
        // could be also bound via @Value 
        private String sftpRemoteDirectory = "/path/to/remote/dir";
    
        @Bean
        public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
            factory.setHost("localhost");
            factory.setPort(22222);
            factory.setUser("client1");
            factory.setPassword("password123");
            factory.setAllowUnknownKeys(true);
            return new CachingSessionFactory<>(factory);
        }
    
        @Bean
        public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
            return e -> e
                    .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                    .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                    .handle(
                        Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                    );
        }
    
        @Bean
        public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
            RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
            template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
            template.setAutoCreateDirectory(true);
            template.afterPropertiesSet();
            template.setUseTemporaryFileName(false);
            return template;
        }
    }
    

    Wiring up:

    public class SpringApp {
        public static void main(String[] args) {
            final MyFileService fileService = ctx.getBean(MyFileService.class);
            final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
            // trigger the sftp upload flow manually - only once
            sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
        }
    }
    

    Import notes:

    1.

    @Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);

    Here the DirectChannel channel sftpOutboundFlow.input will be used to pass message with the payload (= List<File> files) to the receiver. If this channel is not created yet, the Gateway is going to create it implicitly.

    2.

    @Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }

    Since IntegrationFlow is a Consumer functional interface, we can simplify the flow a little using the IntegrationFlowDefinition. During the bean registration phase, the IntegrationFlowBeanPostProcessor converts this inline (Lambda) IntegrationFlow to a StandardIntegrationFlow and processes its components. An IntegrationFlow definition using a Lambda populates DirectChannel as an inputChannel of the flow and it is registered in the application context as a bean with the name sftpOutboundFlow.input in the sample above (flow bean name + ".input"). That's why we use that name for the SFtpOutboundGateway gateway.

    Ref: https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

    3.

    @Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}

    see: Remote directory for sftp outbound gateway with DSL

    Flowchart:

    enter image description here