We have a custom error channel for our application which mainly notifies integration failures via Email. When there is an exception thrown in the SendEmail Http call (WebFlux.outboundGateway) of the custom error channel, the integration flow goes on an infinite loop.
Please help stop this behavior
@Bean("appErrorChannel")
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow errorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.<MessagingException> log(ERROR, message -> "Exception: " + ExceptionUtils.getStackTrace(message.getPayload()))
.transform(transformer, "errorMessage")
.transform(transformer, "emailRequest")
.log(INFO, message -> "Email Request: " + message)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(CONTENT_TYPE, APPLICATION_JSON_VALUE))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, errorFlowErrorChannel()))
.handle(WebFlux.outboundGateway(sendEmailURL, webClient)
.httpMethod(POST)
.expectedResponseType(String.class)
.mappedRequestHeaders(CONTENT_TYPE))
.log(INFO, message -> "Email Response: " + message)
.get();
// @formatter:on
}
@Bean("errorFlowErrorChannel")
public MessageChannel errorFlowErrorChannel() {
return new DirectChannel();
}
@Bean("errorChannelErrorFlow")
public IntegrationFlow errorChannelErrorFlow() {
// @formatter:off
return IntegrationFlows.from(errorFlowErrorChannel())
.<MessagingException> log(FATAL, message -> "Exception in Error Flow: " + ExceptionUtils.getStackTrace(message.getPayload()))
.get();
// @formatter:on
}
Exception:
2022-04-09 11:45:11,198[0;39m [boundedElastic-1 ] [31mERROR[0;39m [36mo.s.i.w.o.WebFluxRequestExecutingMessageHandler[0;39m - [35me2abc8b6eba0119c[0;39m Failed to send async reply
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.appErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST ****** URL ******, failedMessage=GenericMessage [payload=******* SUPPRESSED ********}, headers={b3=e2abc8b6eba0119c-c4254a192695705d-1, nativeHeaders={}, errorChannel=bean 'appErrorChannel'; defined in: 'class path resource [**************}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendErrorMessage(AbstractMessageProducingHandler.java:496)
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.onFailure(AbstractMessageProducingHandler.java:555)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.notifyFailure(ListenableFutureCallbackRegistry.java:86)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.failure(ListenableFutureCallbackRegistry.java:158)
.....
.....
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
It is really not recommended to have a single global error channel for the whole application.
Consider to have its own error channel for this specific error handling flow.
Set it via enrichHeaders(MessageHeaders.ERROR_CHANNEL, "errorFlowErrorChannel")
and there is not going to be an infinite loop because an error from the WebFlux.outboundGateway()
is not going to be forwarded to the appErrorChannel
any more.