Search code examples
spring-bootspring-integrationspring-integration-sftp

Spring Integration: how to access the returned values from last Subscriber


I'm trying to implement a SFTP File Upload of 2 Files which has to happen in a certain order - first a pdf file and after successfull upload of that an text file with meta information about the pdf.

I followed the advice in this thread, but can't get it to work properly.

My Spring Boot Configuration:

@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
        factory.setPrivateKey(sftpPrivateKey);
        factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
        factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
}

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}


@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("filename");
        } else {
            throw new IllegalArgumentException("File expected as payload.");
        }
    });
    return handler;
}

@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(@Payload byte[] file, @Header("filename") String filename) {
    return "The SFTP transfer complete for file: " + filename;
}

@MessagingGateway
public interface UploadGateway {

    @Gateway(requestChannel = "toSftpChannel")
    String upload(@Payload byte[] file, @Header("filename") String filename);

}

My Test Case:

final String pdfStatus = uploadGateway.upload(content, documentName);
log.info("Upload of {} completed, {}.", documentName, pdfStatus);

From the return of the Gateway upload call i expect to get the String confirming the upload e.g. "The SFTP transfer complete for file:..." but I get the the returned content of the uploaded File in byte[]:

Upload of 123456789.1.pdf completed, 37,80,68,70,45,49,46,54,13,37,-30,-29,-49,-45,13,10,50,55,53,32,48,32,111,98,106,13,60,60,47,76,105,110,101,97,114,105,122,101,100,32,49,47,76,32,50,53,52,55,49,48,47,79,32,50,55,55,47,69,32,49,49,49,55,55,55,47,78,32,49,47,84,32,50,53,52,51,53,57,47,72,32,91,32,49,49,57,55,32,53,51,55,93,62,62,13,101,110,100,111,98,106,13,32,32,32,32,32,32,32,32,32,32,32,32,13,10,52,55,49,32,48,32,111,98,106,13,60,60,47,68,101,99,111,100,101,80,97,114,109,115,60,60,47,67,111,108,117,109,110,115,32,53,47,80,114,101,100,105,99,116,111,114,32,49,50,62,62,47,70,105,108,116,101,114,47,70,108,97,116,101,68,101,99,111,100,101,47,73,68,91,60,57,66,53,49,56,54,69,70,53,66,56,66,49,50,52,49,65,56,50,49,55,50,54,56,65,65,54,52,65,57,70,54,62,60,68,52,50,68,51,55,54,53,54,65,67,48,55,54,52,65,65,53,52,66,52,57,51,50,56,52,56,68,66 etc.

What am I missing?


Solution

  • I think @Order(0) doesn't work together with the @Bean.

    To fix it you should do this in that bean definition istead:

    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setOrder(0);
    

    See Reference Manual for more info:

    When using these annotations on consumer @Bean definitions, if the bean definition returns an appropriate MessageHandler (depending on the annotation type), attributes such as outputChannel, requiresReply etc, must be set on the MessageHandler @Bean definition itself.

    In other words: if you can use setter, you have to. We don't process annotations for this case because there is no guarantee what should get a precedence. So, to avoid such a confuse we have left for you only setters choice.

    UPDATE

    I see your problem and it is here:

    @Bean
    @BridgeTo
    public MessageChannel toSftpChannel() {
        return new PublishSubscribeChannel();
    }
    

    That is confirmed by the logs:

    Adding {bridge:dmsSftpConfig.toSftpChannel.bridgeTo} as a subscriber to the 'toSftpChannel' channel
    Channel 'org.springframework.context.support.GenericApplicationContext@b3d0f7.toSftpChannel' has 3 subscriber(s).
    started dmsSftpConfig.toSftpChannel.bridgeTo
    

    So, you really have one more subscriber to that toSftpChannel and it is a BridgeHandler with an output to the replyChannel header. And a default order is like private volatile int order = Ordered.LOWEST_PRECEDENCE; this one becomes as a first subscriber and exactly this one returns you that byte[] just because it is a payload of request.

    You need to decide if you really need such a bridge. There is no workaround for the @Order though...