Search code examples
springspring-integrationspring-integration-sftp

Request-reply for SFTP in Spring


I'm trying to implement some kind of request-reply feature for SFTP connection in Spring. The logic consists of the following pieces:

  1. Prepare file
  2. Send a file via SFTP
  3. Hold on the processing and await file response (listen to same SFTP with some timeout)
  4. Once response file is found on SFTP, stop listening, transfer the file, process the data and resume the processing. If timeout was reached, the processing should resume without doing operations on the file (a specific status would be set poining there's no response).

I'm quite new to Spring Integration and I'm not sure if there's any build-in request-reply logic I could use instead of the below solution. My idea for 2. was to send a file using inputChannel and @MessagingGateway as per the doc and this piece of code is working as expected. Once the file is sent, I'm trying to handle 3. and 4. by manual registration of inboundAdapter and implementing a file transformation in handle method. However with this approach I would face issues with resuming processing stopped after sending the file, as the logic would proceed and handle method would be in a separate thread. Additional issue is that handle method is called for each directory on local, not only if expected remove file found.

Sending a file:

   @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        //... setters
        return new CachingSessionFactory<SftpClient.DirEntry>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("/remoteDir");
        handler.setFileNameGenerator(//...);
        return handler;
    }

    @MessagingGateway
    public interface UploadGateway{

         @Gateway(requestChannel = "toSftpChannel")
         void upload(File file);

    }

Listening to a response:

public void createSftpListener(String fileName){
    IntegrationFlow flow = IntegrationFlows
    .from(
         Sftp.inboundAdapter(sftpSessionFactory)
             .preserveTimestamp(true)
             .remoteDirectory("/remoteDir")
             .regexFilter(fileName)
             .localDirectory("/localDir"),
         e -> e.id("sftpInboundAdapter")
               .autoStartup(true)
              .poller(Pollers.fixedDelay(10000))
         )
     .handle(m -> {         //executed for each subdirectory of localDir, not only if fileName found in remoteDir, as I would expected
           service.processFile(fileName);
           this.flowContext.remove(fileName); //stop listening
           })
     .get();
   
  IntegrationFlowContext.IntegrationFlowRegistration register = 
                    this.flowContext.registration(flow).id(fileName).register(); //register with id equals to unique fileName

}

Processing logic skeleton:

public Status process(){
    prepareFile();
    uploadGateway.sendFile();
    createSftpListener(); //should listen until file received or until timeout reached 
    return finalLogic(); //should be executed once filen received or timeout reached
}

EDIT:

    @MessagingGateway
    public interface DownloadGateway{

         @Gateway(requestChannel = "fromSftpChannel")
         File get(String filePath);

    }


    @Bean
    @ServiceActivator(inputChannel = "fromSftpChannel")
    public MessageHandler handlerFrom() {
        SftpOutboundGateway handler = new SftpOutboundGateway(sftpSessionFactory(), "get", "payload");
        handler.setLocalDirectory("/localDir/reply");
        handler.setAdviceChain(Arrays.asList(replyRetryAdvice()));
        return handler;
    }

EDIT2:

    @Bean
    public PublishSubscribeChannel pubSubChannel(){
        return new PublishSubscribeChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "pubSubChannel")
    @Order(0) //seems ok
    public MessageHandler handlerTo() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("/remoteDir");
        handler.setFileNameGenerator(//...);
        //  handler.setOrder(0); //doesn't keep the order? 
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "pubSubChannel")
    @Order(1) //seems ok
    public MessageHandler handlerFrom() {
        SftpOutboundGateway handler = new SftpOutboundGateway(sftpSessionFactory(), "get", "'/remoteDir/'+payload.getName().replace('.txt','.out')");
        handler.setLocalDirectory("/localDir/reply");
        handler.setAdviceChain(Arrays.asList(replyRetryAdvice()));
        //  handler.setOrder(1); //doesn't keep the order? 
        return handler;
    }

    @MessagingGateway
    public interface PubSubGateway{
         @Gateway(requestChannel = "pubSubChannel")
         File reqRep(File requestFile);
    }

Solution

  • I would suggest you to look into an SftpOutboundGateway in combination with a RequestHandlerRetryAdvice with some backoff in between retries. So, after sending a file via SftpMessageHandler, you proceed to the SftpOutboundGateway with that retry loop until a reply the is returned by the GET operation. For this scenario to work your toSftpChannel must be a PublishSubscribeChannel: the same message with an original file would be sent to the SFTP and sub-flow with an SftpOutboundGateway to wait for a reply file. This way an original @MessagingGateway would be blocked until all the retry attempts are exhausted. you probably would need also to look into some other than void return from your gateway method.