I have a question about handling the payload in Ftp.outboundAdapter.
I have a Gateway method that I am calling with @Headers where I am passing the branch as a parameter with the file ...etc.
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(byte[] file, @Header(FileHeaders.FILENAME) String fileName, @Header("branch") String branch);
Now in my Integration flow I am receiving the data from the channel and based on the branch I will create the sessionfactory, so that the file will be dropped in the correct branch.
Issue is that in my handle method if I use the lambda to get the branch from the Generic message, it is not sending the file to destination and not erroring, I assume it will just do nothing, but if I remove the lambda and I just hardcode the branch in the ftpSessionFactory it will send. So might need a small tweak that I am not able to find.
public SessionFactory<FTPFile> ftpSessionFactory(String branch) {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(branch);
sf.setUsername(appProperties.getFtp().getBranchFtpUsername());
sf.setPassword(appProperties.getFtp().getBranchFtpPassword());
sf.setPort(appProperties.getFtp().getBranchFtpPort());
return sf;
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle( m -> {
log.info("Sending to branch {} ", m)
Ftp.outboundAdapter(ftpSessionFactory(m.getHeaders().get("branch", String.class)))
.remoteDirectory(appProperties.getFtp().getBranchFtpLocation())
.autoCreateDirectory(true);
}
)
.get();
}
@Bean
public IntegrationFlow error() {
return IntegrationFlows.from(ERROR_CHANNEL)
.transform("headers.file_originalFile")
.handle(Ftp.outboundAdapter(ftpSessionFactory("fra"), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory(appProperties.getFtp().getBranchFtpErrorLocation())
, e -> e.advice(expressionAdvice()))
.get();
}
This code is wrong:
.handle( m -> {
log.info("Sending to branch {} ", m)
Ftp.outboundAdapter(
You cannot create a channel adapter on the fly: it has to be registered as a singleton during application initialization.
For your "branching" logic you need to look into a DelegatingSessionFactory
: https://docs.spring.io/spring-integration/docs/current/reference/html/ftp.html#ftp-dsf. So, you would call a DelegatingSessionFactory.setThreadKey(Message<?> message, Object key)
to pull your branch from header in a handle before a regular handle(Ftp.outboundAdapter())
definition.