Search code examples
spring-integrationspring-integration-dslspring-integration-sftp

How to pass parameter to MessageHandler during runtime


I am trying to set up a SFTP file listener using Spring Integration which will poll for files based on pattern and rename the file once processed. I can't hardcode Host, port, username and password nor the file pattern etc because I want to create poller at runtime.

It is mostly working (processing file, renaming it later) but I am not able to dynamically select SftpRemoteFileTemplate or sftpSessionFactory at runtime for both the Inbound Poller and OutboundGateway. I tried using DelegatingSessionFactory but couldn't make it work with poller so I tried to implement my own in-memory cache which will store SftpRemoteFileTemplate for every watcher I create. Then it should use the same sftptemplate while renaming file.

Problem: For some reason I can't fetch value from my Header and use it to get my SFTPRemoteTemplate. I get error Property or field 'headers' cannot be found on null which is probably because headers are not available when creating this IntegrationFlow.

Here's what I was able to get working so far -

@Configuration
@Slf4j
public class InboundFtpConfiguration {

HashMap<Integer, SftpRemoteFileTemplate> sftpRemoteFileTemplateMap = new HashMap<>();

private static SessionFactory<ChannelSftp.LsEntry> createSFtpSessionFactory(String host, int port, String username, String password) {
    DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
    sftpSessionFactory.setHost(host);
    sftpSessionFactory.setPort(port);
    sftpSessionFactory.setUser(username);
    sftpSessionFactory.setPassword(password);
    
    java.util.Properties config = new java.util.Properties();
    config.put("StrictHostKeyChecking", "no");
    sftpSessionFactory.setSessionConfig(config);
    return sftpSessionFactory;
}


public String createInboundChannelAdapter(FTPObject ftpObject) {

    if(sftpRemoteFileTemplateMap.get(ftpObject.getKey()) == null){
        sftpRemoteFileTemplateMap.put(ftpObject.getKey(),
                new SftpRemoteFileTemplate(createSFtpSessionFactory(ftpObject.getHost(), ftpObject.getPort(), ftpObject.getUsername(), ftpObject.getPassword())
        ));
    }


    var flow = IntegrationFlows
            .from(Sftp.inboundStreamingAdapter(sftpRemoteFileTemplateMap.get(ftpObject.getKey()))
                            .remoteDirectory(ftpObject.getRemoteDirectory())
                            .regexFilter("(.*).txt"),
                    sourcePollingChannelAdapterSpec -> {
                        sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedDelay(5000));
                    }
            )
            .transform(Transformers.fromStream())
            .enrichHeaders(headerEnricherSpec -> {
               headerEnricherSpec.headerExpression("TENANTID", String.valueOf(TenantContext.getTenantId()));
               headerEnricherSpec.headerExpression("key", String.valueOf(ftpObject.getKey()));
            })
            .publishSubscribeChannel(subFlow -> subFlow
                    .subscribe(flow1 -> flow1.handle(h ->{
                        System.out.println("(1) Do Something First");
                    }))
                
                   //.subscribe(flow2 -> flow2.handle((p,h) -> renameFile(m.getHeaders())))
                   // .subscribe(flow2 -> flow2.handle(renameFile(m.getHeaders())))
                    .subscribe(flow2 -> flow2.handle(renameFile()))
            )
            .log(LoggingHandler.Level.INFO)
            .get();


    return flowContext
            .registration(flow)
            .autoStartup(false)
            .register()
            .getId();
}

public MessageHandler renameFile(){
  
    //System.out.println("In rename file key is: "+new SpelExpressionParser().parseExpression("headers['key']").getValue());

    //ERROR generating line
    SftpOutboundGateway sftpOutboundGateway = new  SftpOutboundGateway(sftpRemoteFileTemplateMap.get((Integer) new SpelExpressionParser().parseExpression("headers['key']").getValue()),
            AbstractRemoteFileOutboundGateway.Command.MV.getCommand(),"headers['file_remoteDirectory'] + headers['file_remoteFile']");



    sftpOutboundGateway.setRenameExpressionString( "headers['file_remoteDirectory'] + headers['file_remoteFile']+'.processed'");
    sftpOutboundGateway.setRequiresReply(false);
    sftpOutboundGateway.setOutputChannelName("nullChannel");
    sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
    sftpOutboundGateway.setAsync(true);
    return sftpOutboundGateway;

   /* return Sftp.outboundGateway(sftpRemoteFileTemplateMap.get(1),
            AbstractRemoteFileOutboundGateway.Command.MV.getCommand(),"headers['file_remoteDirectory'] + headers['file_remoteFile']")
            .renameExpression("headers['file_remoteDirectory'] + headers['file_remoteFile']+'.processed'")
            .get();*/

}
}

So if I use the commented handle((p,h) -> rename(h)) method to pass header parameters(which has key to sftptemplate) the SFTP file renaming doesn't work for some reason. And I don't know how else to reuse session. Any pointers please?


Solution

  • This line is wrong:

    new SftpOutboundGateway(sftpRemoteFileTemplateMap.get((Integer) new SpelExpressionParser().parseExpression("headers['key']").getValue())
    

    You call get value on just parsed SpEL expression. you don't do that at runtime against the message.

    The SftpOutboundGateway can be configured with a DelegatingSessionFactory.

    You just need to call its public Message<?> setThreadKey(Message<?> message, Object key) { before handling message to the SftpOutboundGateway.

    See also: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#context-holder-advice