Search code examples
springspring-bootspring-integration

Direct / Executor Channel - bypass some custom exception in case of failover strategy set to true ( don`t use other subscribers )


Is there a possibility to bypass failover strategy in Direct Channel when custom Exceptions is thrown ?

If I add custom Error Channel - error message will reach error channel after all subscribers will be exhausted( in my case all JmsOutboundGateway will fail).

In my case I would like to check what error was the first error in inputLocalChannel ( in my case in jmsOutboundGateway) and if its expected error then stop processing (failover flow - don't pass it to other subscriber) and throw error - pass it to callback -> on Error


@MessagingGateway(name = "LocalGateway" , errorChannel = "defaultErrorChannel")
public interface LocalGateway {

    @Gateway(requestChannel = "inputLocalChannel")
    ListenableFuture<Message<String>> sendMsg(Message<String> request);

}


    @Bean
    public IntegrationFlow someFlow() {
        return f -> f
                .routeByException(r -> r
                        .channelMapping(MessageTimeoutException.class, "customErrorChannel")
                        .defaultOutputChannel("defaultErrorChannel"));
    }


    @Bean
    public MessageChannel defaultErrorChannel() {

        DirectChannel dr = new DirectChannel();
        return dr;
    }

    @Bean
    public MessageChannel customErrorChannel() {

        DirectChannel dr = new DirectChannel();
        return dr;
    }


    @ServiceActivator(inputChannel = "defaultErrorChannel")
    private void processDefaultError(ErrorMessage errMsg)  {
            throw errMsg.getPayload();
    }

    @ServiceActivator(inputChannel = "customErrorChannel")
    private void processCustomError(ErrorMessage errMsg) {

      // some code 

    }


@Bean
    @ServiceActivator(inputChannel = "inputLocalChannel")
    public Message<String>firstHandler(){
    // some code
}

@Bean
    @ServiceActivator(inputChannel = "inputLocalChannel")
    public Message<String> secondHandler(){
    // some code
}


@Bean
    @ServiceActivator(inputChannel = "inputGateway")
    public JmsOutboundGateway firstGateway(){
    // some code
}

@Bean
    @ServiceActivator(inputChannel = "inputGatewayBck")
    public JmsOutboundGateway secondGateway(){
    // some code
}


Solution

  • It looks like you want something what can be called as a "conditional failover".

    That feature is indeed not available for DirectChannel, and I don't it has to be.

    I suggest yo look into an ErrorMessageExceptionTypeRouter instead: https://docs.spring.io/spring-integration/reference/router/implementations.html#router-implementations-exception-router. This way you would need to have more channels, include the errorChannel option of the @MessagingGateway. So, when error happens on the first gateway, you handle it by that ErrorMessageExceptionTypeRouter and decide where to go next.

    UPDATE

    This is what I thought about your "conditional failover":

    @MessagingGateway(name = "LocalGateway" , errorChannel = "routeErrorChannel")
    public interface LocalGateway {
    
        @Gateway(requestChannel = "inputLocalChannel")
        ListenableFuture<Message<String>> sendMsg(Message<String> request);
    
    }
    
    
        @Bean
        public IntegrationFlow errorFlow() {
            return IntegrationFlow.from("routeErrorChannel")
                    .routeByException(r -> r
                            .channelMapping(MessageTimeoutException.class, "secondInputLocalChannel")
                            .defaultOutputChannel("rethrowErrorChannel"));
        }
    
    @Bean
        @ServiceActivator(inputChannel = "inputLocalChannel")
        public Message<String>firstHandler(){
        // some code
    }
    
    @Bean
        @ServiceActivator(inputChannel = "secondInputLocalChannel")
        public Message<String> secondHandler(){
        // some code
    }
    

    So, you catch an error from the inputLocalChannel in that routeErrorChannel. And there with the routeByException() you make a decision according to the exception your got. In my example you send the message into a secondInputLocalChannel when MessageTimeoutException. However you still need to unwrap a failedMessage from that exception before publishing it into that secondInputLocalChannel. Otherwise the whole ErrorMessage is going to be sent. See MessagingException.getFailedMessage() property.