Search code examples
spring-integration

Spring integration Ftp.outboundAdapter


I have a question about handling the payload in Ftp.outboundAdapter.

I have a Gateway method that I am calling with @Headers where I am passing the branch as a parameter with the file ...etc.

@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(byte[] file, @Header(FileHeaders.FILENAME) String fileName, @Header("branch") String branch);

Now in my Integration flow I am receiving the data from the channel and based on the branch I will create the sessionfactory, so that the file will be dropped in the correct branch.

Issue is that in my handle method if I use the lambda to get the branch from the Generic message, it is not sending the file to destination and not erroring, I assume it will just do nothing, but if I remove the lambda and I just hardcode the branch in the ftpSessionFactory it will send. So might need a small tweak that I am not able to find.

 public SessionFactory<FTPFile> ftpSessionFactory(String branch) {
    DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
    sf.setHost(branch);
    sf.setUsername(appProperties.getFtp().getBranchFtpUsername());
    sf.setPassword(appProperties.getFtp().getBranchFtpPassword());
    sf.setPort(appProperties.getFtp().getBranchFtpPort());
    return sf;
}

@Bean
public IntegrationFlow localToFtpFlow() {
    return IntegrationFlows.from("toFtpChannel")
                    .handle( m -> {
                        log.info("Sending to branch {} ", m)
                                Ftp.outboundAdapter(ftpSessionFactory(m.getHeaders().get("branch", String.class)))
                                        .remoteDirectory(appProperties.getFtp().getBranchFtpLocation())
                                        .autoCreateDirectory(true);
                            }
                            )
            .get();
}

@Bean
public IntegrationFlow error() {
    return IntegrationFlows.from(ERROR_CHANNEL)
            .transform("headers.file_originalFile")
            .handle(Ftp.outboundAdapter(ftpSessionFactory("fra"), FileExistsMode.REPLACE)
                            .useTemporaryFileName(true)
                            .autoCreateDirectory(true)
                            .remoteDirectory(appProperties.getFtp().getBranchFtpErrorLocation())
                    , e -> e.advice(expressionAdvice()))
            .get();
}

Solution

  • This code is wrong:

    .handle( m -> {
                        log.info("Sending to branch {} ", m)
                                Ftp.outboundAdapter(
    

    You cannot create a channel adapter on the fly: it has to be registered as a singleton during application initialization.

    For your "branching" logic you need to look into a DelegatingSessionFactory: https://docs.spring.io/spring-integration/docs/current/reference/html/ftp.html#ftp-dsf. So, you would call a DelegatingSessionFactory.setThreadKey(Message<?> message, Object key) to pull your branch from header in a handle before a regular handle(Ftp.outboundAdapter()) definition.