I am trying to set up a SFTP file listener using Spring Integration which will poll for files based on pattern and rename the file once processed. I can't hardcode Host, port, username and password nor the file pattern etc because I want to create poller at runtime.
It is mostly working (processing file, renaming it later) but I am not able to dynamically select SftpRemoteFileTemplate
or sftpSessionFactory
at runtime for both the Inbound Poller and OutboundGateway. I tried using DelegatingSessionFactory
but couldn't make it work with poller so I tried to implement my own in-memory cache which will store SftpRemoteFileTemplate
for every watcher I create. Then it should use the same sftptemplate while renaming file.
Problem: For some reason I can't fetch value from my Header and use it to get my SFTPRemoteTemplate. I get error Property or field 'headers' cannot be found on null
which is probably because headers are not available when creating this IntegrationFlow.
Here's what I was able to get working so far -
@Configuration
@Slf4j
public class InboundFtpConfiguration {
HashMap<Integer, SftpRemoteFileTemplate> sftpRemoteFileTemplateMap = new HashMap<>();
private static SessionFactory<ChannelSftp.LsEntry> createSFtpSessionFactory(String host, int port, String username, String password) {
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost(host);
sftpSessionFactory.setPort(port);
sftpSessionFactory.setUser(username);
sftpSessionFactory.setPassword(password);
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
sftpSessionFactory.setSessionConfig(config);
return sftpSessionFactory;
}
public String createInboundChannelAdapter(FTPObject ftpObject) {
if(sftpRemoteFileTemplateMap.get(ftpObject.getKey()) == null){
sftpRemoteFileTemplateMap.put(ftpObject.getKey(),
new SftpRemoteFileTemplate(createSFtpSessionFactory(ftpObject.getHost(), ftpObject.getPort(), ftpObject.getUsername(), ftpObject.getPassword())
));
}
var flow = IntegrationFlows
.from(Sftp.inboundStreamingAdapter(sftpRemoteFileTemplateMap.get(ftpObject.getKey()))
.remoteDirectory(ftpObject.getRemoteDirectory())
.regexFilter("(.*).txt"),
sourcePollingChannelAdapterSpec -> {
sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedDelay(5000));
}
)
.transform(Transformers.fromStream())
.enrichHeaders(headerEnricherSpec -> {
headerEnricherSpec.headerExpression("TENANTID", String.valueOf(TenantContext.getTenantId()));
headerEnricherSpec.headerExpression("key", String.valueOf(ftpObject.getKey()));
})
.publishSubscribeChannel(subFlow -> subFlow
.subscribe(flow1 -> flow1.handle(h ->{
System.out.println("(1) Do Something First");
}))
//.subscribe(flow2 -> flow2.handle((p,h) -> renameFile(m.getHeaders())))
// .subscribe(flow2 -> flow2.handle(renameFile(m.getHeaders())))
.subscribe(flow2 -> flow2.handle(renameFile()))
)
.log(LoggingHandler.Level.INFO)
.get();
return flowContext
.registration(flow)
.autoStartup(false)
.register()
.getId();
}
public MessageHandler renameFile(){
//System.out.println("In rename file key is: "+new SpelExpressionParser().parseExpression("headers['key']").getValue());
//ERROR generating line
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplateMap.get((Integer) new SpelExpressionParser().parseExpression("headers['key']").getValue()),
AbstractRemoteFileOutboundGateway.Command.MV.getCommand(),"headers['file_remoteDirectory'] + headers['file_remoteFile']");
sftpOutboundGateway.setRenameExpressionString( "headers['file_remoteDirectory'] + headers['file_remoteFile']+'.processed'");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
/* return Sftp.outboundGateway(sftpRemoteFileTemplateMap.get(1),
AbstractRemoteFileOutboundGateway.Command.MV.getCommand(),"headers['file_remoteDirectory'] + headers['file_remoteFile']")
.renameExpression("headers['file_remoteDirectory'] + headers['file_remoteFile']+'.processed'")
.get();*/
}
}
So if I use the commented handle((p,h) -> rename(h)) method to pass header parameters(which has key to sftptemplate) the SFTP file renaming doesn't work for some reason. And I don't know how else to reuse session. Any pointers please?
This line is wrong:
new SftpOutboundGateway(sftpRemoteFileTemplateMap.get((Integer) new SpelExpressionParser().parseExpression("headers['key']").getValue())
You call get value on just parsed SpEL expression. you don't do that at runtime against the message.
The SftpOutboundGateway
can be configured with a DelegatingSessionFactory
.
You just need to call its public Message<?> setThreadKey(Message<?> message, Object key) {
before handling message to the SftpOutboundGateway
.