I want my service gateway to return any error in its immediate flow (up to the nearest async channel) to service caller as a valid message (not exception), in sync fashion.
Entire flow is on a single thread. My minimal example refuses to work as desired and I can't figure out proper way to use framework to achieve my goal. Please, see code below.
@RunWith(SpringRunner.class)
public class ErrorHandlingTests {
@Autowired
ErrorsHandlingService errorsHandlingService;
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel").
handle(errorMessage -> {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
})
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow1() {
return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
})
.get();
}
}
@Test
public void testErrorHandlingInFlow1() {
assertEquals("R: a", errorsHandlingService.process("a"));
assertEquals("Failure for oops", errorsHandlingService.process("oops"));
}
}
This test hangs on 2nd assert and log prints:
W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]
All of this happens on the latest 5.3.4 release.
You don't need to use that replyChannel.send()
manually. For such an error handling and compensation reply producing you just need to do like this:
<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
The logic in the gateway is like this:
replyChannel
header.handleSendAndReceiveError()
errorChannel
configured for this gateway and performs send-n-receive of ErrorMessage
Well, the actual problem is that you use a replyChannel
from the failedMessage
, which is not valid any more since the main flow has failed already. At this point we deal with new error message flow and we really should rely on its headers already. Or just let the framework to do reply correlation for us!