I used publishSubscribeChannel and add a taskExecutor to implement async.
Below is the code.
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@Bean
public IntegrationFlow testFlow (){
return IntegrationFlows.from("testFlow")
.handler(handlerSomeThing())
.get();
}
As you see , mainFlow to testFlow. Now I want to implement an errorChannel to handle testFlow exception. So what's the good way to do ?
I tried to use the way like this , implement an ErrorHandler and set into subscribe.But is there any other way ?
private TestErrorHandler errorHandler;
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.errorHandler(errorHandler);
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@Component
public class TestErrorHandler implements ErrorHandler {
@Autowired
private MessagingTemplate messagingTemplate;
@Autowired
@Qualifier(RTSChannel.PerformNameScreening.ERROR_CHANNEL)
private MessageChannel errorChannel;
@Override
public void handleError(Throwable throwable) {
messagingTemplate.send(errorChannel,new ErrorMessage(throwable));
}
@Bean
public MessagingTemplate errorMessagingTemplate(){
return new MessagingTemplate();
}
}
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
You can't have a flow with the same name as the first channel in the flow. Assuming you meant
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlowChannel")
Normally, the error channel would be on some component upstream of mainFlowChannel
.
If you want to scope the error handling just within the subflow, you would need to use a .gateway()
there.
.publishSubscribeChannel(subFlowTaskExecutor, subscribe-> flow->
flow.gateway(testFlow(), g -> g.errorChannel("errorFlowInputChannel)))