I'm trying to implement some kind of request-reply feature for SFTP connection in Spring. The logic consists of the following pieces:
I'm quite new to Spring Integration and I'm not sure if there's any build-in request-reply logic I could use instead of the below solution. My idea for 2. was to send a file using inputChannel
and @MessagingGateway
as per the doc and this piece of code is working as expected. Once the file is sent, I'm trying to handle 3. and 4. by manual registration of inboundAdapter
and implementing a file transformation in handle
method. However with this approach I would face issues with resuming processing stopped after sending the file, as the logic would proceed and handle
method would be in a separate thread. Additional issue is that handle
method is called for each directory on local, not only if expected remove file found.
Sending a file:
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
//... setters
return new CachingSessionFactory<SftpClient.DirEntry>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("/remoteDir");
handler.setFileNameGenerator(//...);
return handler;
}
@MessagingGateway
public interface UploadGateway{
@Gateway(requestChannel = "toSftpChannel")
void upload(File file);
}
Listening to a response:
public void createSftpListener(String fileName){
IntegrationFlow flow = IntegrationFlows
.from(
Sftp.inboundAdapter(sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("/remoteDir")
.regexFilter(fileName)
.localDirectory("/localDir"),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(10000))
)
.handle(m -> { //executed for each subdirectory of localDir, not only if fileName found in remoteDir, as I would expected
service.processFile(fileName);
this.flowContext.remove(fileName); //stop listening
})
.get();
IntegrationFlowContext.IntegrationFlowRegistration register =
this.flowContext.registration(flow).id(fileName).register(); //register with id equals to unique fileName
}
Processing logic skeleton:
public Status process(){
prepareFile();
uploadGateway.sendFile();
createSftpListener(); //should listen until file received or until timeout reached
return finalLogic(); //should be executed once filen received or timeout reached
}
EDIT:
@MessagingGateway
public interface DownloadGateway{
@Gateway(requestChannel = "fromSftpChannel")
File get(String filePath);
}
@Bean
@ServiceActivator(inputChannel = "fromSftpChannel")
public MessageHandler handlerFrom() {
SftpOutboundGateway handler = new SftpOutboundGateway(sftpSessionFactory(), "get", "payload");
handler.setLocalDirectory("/localDir/reply");
handler.setAdviceChain(Arrays.asList(replyRetryAdvice()));
return handler;
}
EDIT2:
@Bean
public PublishSubscribeChannel pubSubChannel(){
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "pubSubChannel")
@Order(0) //seems ok
public MessageHandler handlerTo() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("/remoteDir");
handler.setFileNameGenerator(//...);
// handler.setOrder(0); //doesn't keep the order?
return handler;
}
@Bean
@ServiceActivator(inputChannel = "pubSubChannel")
@Order(1) //seems ok
public MessageHandler handlerFrom() {
SftpOutboundGateway handler = new SftpOutboundGateway(sftpSessionFactory(), "get", "'/remoteDir/'+payload.getName().replace('.txt','.out')");
handler.setLocalDirectory("/localDir/reply");
handler.setAdviceChain(Arrays.asList(replyRetryAdvice()));
// handler.setOrder(1); //doesn't keep the order?
return handler;
}
@MessagingGateway
public interface PubSubGateway{
@Gateway(requestChannel = "pubSubChannel")
File reqRep(File requestFile);
}
I would suggest you to look into an SftpOutboundGateway
in combination with a RequestHandlerRetryAdvice
with some backoff in between retries. So, after sending a file via SftpMessageHandler
, you proceed to the SftpOutboundGateway
with that retry loop until a reply the is returned by the GET
operation. For this scenario to work your toSftpChannel
must be a PublishSubscribeChannel
: the same message with an original file would be sent to the SFTP and sub-flow with an SftpOutboundGateway
to wait for a reply file. This way an original @MessagingGateway
would be blocked until all the retry attempts are exhausted. you probably would need also to look into some other than void
return from your gateway method.