A spring integration based converter consumes the messages from one system, checks, converts and sends it to the other one.
Should the target system be down, we stop the inbound adapters, but would also like to persist locally or forward the currently "in-flight" converted messages. For that would simply like to reroute the messages from the normal output channel to some "backup"-channel dynamically.
In the docs I have found only the option to route the messages based on their headers ( so on some step before in flow I would have to add those dynamically once the targer system is not availbale), or based on the payload type, which is not really my case. The case with adding dynamically some header, and then filtering it out down the pipe, or during de-/serializing still seems not the best approach for me. I would like rather to be able to turn a switch(on some internal Event) that would then reroute those "in-flight" messages to the "backup"-channel.
What would be a best SI approach to achive this? Thanks!
The router could not only be based on the the payload type or some header. You really can have a general POJO method invocation to return a channel, its name or some routing key which is mapped. That POJO method indeed can check some internal system state and produce this or that routing key.
So, you may have something like this in the router configuration:
.route(myRouter())
where your myRouter
is something like this:
@Bean
MyRouter myRouter() {
return;
}
and its internal code might be like this:
public class MyRouter {
@Autowired
private SystemState systemState;
String route(Object payload) {
return this.systemState.isActive() ? "successChannel" : "backupChannel";
}
}
The same can be achieved a simple lambda definition:
.<Object, Boolean>route(p -> systemState().isActive(),
m -> m.channelMapping(true, "sucessChannel")
.channelMapping(false, "backupChannel"))
Also...
private final AtomicBoolean switcher = new AtomicBoolean();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.route(s -> switcher.get() ? "foo" : "bar")
.get();
}