Motivation: I need to set the threadKey for a DelegatingSessionFactory before I route to an Sftp outbound gateway and unset the threadKey afterwards.
Depending on a tenant I need to use a different Sftp user account. The user accounts are a matter of configuration in my application.yml, I do not want to write separate routes for each new tenant.
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
Setting the threadKey requires a Message<?>
, not just payload and headers. So I use a bean because it takes a message:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
I would like to write that as a lambda, as in
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
but that is not so easy. The lambda that can be used with handle()
which takes a Message<T>
ends the flow because it is a void function (MessageHandler
functional interface). The other lambda is a GenericHandler, which does not end the flow, but it takes payload and headers, not a message.
This is just an example, every now and then I wish I could use handle()
with a message in a lambda without ending the flow. How can I do that?
Update
The DelegatingSessionFactory
is not a particularly well suited example. Since setting and clearing the thread key should happen before and after the sftp invocation, an advice fits better than defining a handler before and after the call.
Got it. The javadoc for handle()
says
Use
handle(Class, GenericHandler)
if you need to access the entire message.
The Class parameter must be Message.class
:
.handle(Message.class,
(message, headers) -> sftpSessionFactory
.setThreadKey(message, headers.get("tenantId")))