Search code examples
spring-integrationspring-integration-dsl

Spring integration: failure of conversion of gateway error to message


I want my service gateway to return any error in its immediate flow (up to the nearest async channel) to service caller as a valid message (not exception), in sync fashion.

Entire flow is on a single thread. My minimal example refuses to work as desired and I can't figure out proper way to use framework to achieve my goal. Please, see code below.

@RunWith(SpringRunner.class)
public class ErrorHandlingTests {

    @Autowired
    ErrorsHandlingService errorsHandlingService;

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel").
                    handle(errorMessage -> {
                        Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
                        MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
                        replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
                    })
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow1() {
            return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");
                            return "R: " + s;
                        }
                    })
                    .get();
        }
    }


    @Test
    public void testErrorHandlingInFlow1() {
        assertEquals("R: a", errorsHandlingService.process("a"));
        assertEquals("Failure for oops", errorsHandlingService.process("oops"));
    }
}

This test hangs on 2nd assert and log prints:

W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]

All of this happens on the latest 5.3.4 release.


Solution

  • You don't need to use that replyChannel.send() manually. For such an error handling and compensation reply producing you just need to do like this:

    <MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
    

    The logic in the gateway is like this:

    1. send-n-receive - sync or async - via replyChannel header.
    2. If failure with the previous, catch that exception and process it in the handleSendAndReceiveError()
    3. It takes an errorChannel configured for this gateway and performs send-n-receive of ErrorMessage
    4. The reply is expected as regular one from that sub-flow.

    Well, the actual problem is that you use a replyChannel from the failedMessage, which is not valid any more since the main flow has failed already. At this point we deal with new error message flow and we really should rely on its headers already. Or just let the framework to do reply correlation for us!