Search code examples
spring-integration-dsl

Spring Integration DSL: lambda to return a Message<T> in handle method, e.g. with DelegatingSessionFactory?


Motivation: I need to set the threadKey for a DelegatingSessionFactory before I route to an Sftp outbound gateway and unset the threadKey afterwards.

Depending on a tenant I need to use a different Sftp user account. The user accounts are a matter of configuration in my application.yml, I do not want to write separate routes for each new tenant.

public IntegrationFlow aDynamicSftpFlow() {
    f -> f
        .handle(tenantSessionDefine()) // how can I use a lambda instead?
        .handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
        .handle(...) // undefine sftp session
}

Setting the threadKey requires a Message<?>, not just payload and headers. So I use a bean because it takes a message:

public class TenantSessionDefine {


    private DelegatingSessionFactory delegatingSessionFactory;

    public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
        this.delegatingSessionFactory = delegatingSessionFactory;
    }

    public Message<?> defineSession(Message<?> message) {
        return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
            .get("tenantId", String.class)); 
// used by SessionFactoryLocator
    }
}

I would like to write that as a lambda, as in

.handle(message -> delegatingSessionFactory.setThreadKey(message, 
    message.getPayload().getTenant())

but that is not so easy. The lambda that can be used with handle() which takes a Message<T> ends the flow because it is a void function (MessageHandler functional interface). The other lambda is a GenericHandler, which does not end the flow, but it takes payload and headers, not a message.

This is just an example, every now and then I wish I could use handle() with a message in a lambda without ending the flow. How can I do that?

Update

The DelegatingSessionFactory is not a particularly well suited example. Since setting and clearing the thread key should happen before and after the sftp invocation, an advice fits better than defining a handler before and after the call.


Solution

  • Got it. The javadoc for handle() says

    Use handle(Class, GenericHandler) if you need to access the entire message.

    The Class parameter must be Message.class:

    .handle(Message.class, 
        (message, headers) -> sftpSessionFactory
            .setThreadKey(message, headers.get("tenantId")))