Search code examples
javalambdaspring-integrationspring-integration-dslspring-integration-aws

Why is my handler method not triggered when defined as a lambda?


I am defining an IntegrationFlow to stream from SFTP to S3 with the DSL syntax this way :

return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
                        .remoteDirectory("remoteDirectory"),
                e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .transform(new StreamTransformer())
                .handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
                .get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
        s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
        s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        return s3MessageHandler;
    }

And it works as intended : the file is well uploaded to my S3 bucket. However, I would like to avoid SPEL syntax, and inject headers from the message to the s3uploadMessageHandler method, this way I could use a simple ValueExpression to set the keyExpression in the s3UploadMessageHandler method. To do this, I changed

handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3

to

handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3

But now this handler doesn't seem to be triggered anymore. There is no errors in the logs, and I know from the logs that the SFTP polling is still working.

I tried to find the reason behind this, and I saw that when entering the handle method in IntegrationFlowdefinition.java, the messageHandler class type is different : it's an S3MessageHandler when called without lambda, and a MyCallingClass$lambda when calling with a lambda expression.

What did I miss to make my scenario working ?


Solution

  • There are two ways to handle a message. One is via a MessageHandler implementation - this is the most efficient approach and that's done in the framework for channel adapter implementation, like that S3MessageHandler. Another way is a POJO method invocation - this is the most user-friendly approach when you don't need to worry about any framework interfaces.

    So, when you use it like this .handle(s3UploadMessageHandler(...)) you refer to a MessageHandler and the framework knows that a bean for that MessageHandler has to be registered since your s3UploadMessageHandler() is not a @Bean.

    When you use it as a lambda, the framework treats it as a POJO method invocation and there is a bean registered for the MethodInvokingMessageHandler, but not your S3MessageHandler.

    Anyway, even if you change your s3UploadMessageHandler() to be a @Bean method it is not going to work because you don't let the framework to call the S3MessageHandler.handleMessage(). What you do here is just call that private method at runtime to create an S3MessageHandler instance against every request message: the MethodInvokingMessageHandler calls your lambda in its handleMessage() and that's all - nothing is going to happen with S3.

    The ValueExpression cannot help you here because you need to evaluate a destination file against every single request message. Therefore you need a runtime expression. There is indeed nothing wrong with the new SpelExpressionParser().parseExpression(). Just because we don't have a choice and have to have only single stateless S3MessageHandler and don't recreate it at runtime on every request like you try to achieve with that suspicious lambda and ValueExpression.