I am trying to add filter to discard the flow and continue executing the main flow even after the failure and aggregate the splitter. the expected type for both error & success are same. there is no specific aggregator logic.
@Bean
public IntegrationFlow flow() {
return f -> f
.split(Orders.class, Orders::getItems)
.enrich(e -> e.requestChannel("enrichChannel"))
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()))
.handle(new MyHandler())
.transform(new MapToObjectTransformer(Order.class))
.enrich(e -> e.requestChannel("transformChannel"))
.filter(Order.class, c -> c.getTotal() > 100 ? true : false,
e -> e.discardChannel(validationError())).handle( new transformer())
.aggregate();
}
@Bean
public IntegrationFlow validationErrorFlow() {
return IntegrationFlows.from(validationError())
.handle(new ValidationHandler())
.get();
}
the discard channel is not joining back to the main flow to execute the next item in the split.
I can write route & subflow mapping but that will become too nested in route -> sub flows -> route -> subflows trying to solve this by using filters. is there a better way to perform validation and still continue the split for all the items in the flow.
Update 1:
.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
(flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
}
gateway is able to intercept the request but the flow executed .handle(new MyHandler())
rather than the next item in split()
Update 2: (Answer) from Artem
.handle(request.class, (p, h) -> validator.validate(p))
.filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.channel("aggregatorChannel")
.aggregate();
This will do a conditional skip & continue on the flow.
the discard channel is not joining back to the main flow to execute the next item in the split.
That's true. That's how it is designed. In most cases the discard flow is something like Dead Letter Queue in JMS. So, that is short one-way branch.
If you really would like to come back to the main flow, you should consider to use naming channel in the flow definition. I mean in the point where you would like to come back after compensation (discard) flow:
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardFlow(sf -> sf
.gateway(validationError())
.channel("myHandleChannel")))
.channel("myHandleChannel")
.handle(new MyHandler())
I use gateway()
because we need a reply from the discard flow to continue processing. We need that .channel("myHandleChannel")
in the end of sub-flow because discard flow is a branch.
Another way to do that can be achieved with the .gateway()
on the main flow:
.gateway("filterFlow.input")
.handle(new MyHandler())
...
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()));
}
We send to the discardChannel
the same request message, therefore a proper replyChannel
header for the mentioned gateway is still there. Only what you need is to ensure a proper reply producing from the .handle(new ValidationHandler())
.