We are facing a strange behavior in our integration flows where the errorChannel does not receive a message in case an exception is thrown in a step after an aggregation.
This is the (reduced) flow:
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlows.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getInputTopic().getName())
.errorChannel(errorHandler.getInputChannel())
)
.channel(nextChannel().getInputChannel())
.get();
}
@Bean
public IntegrationFlow nextChannel() {
return IntegrationFlows.from("next")
.transform(Transformers.fromJson(MyObject.class)) // An exception here is sent to errorChannel
.aggregate(aggregatorSpec ->
aggregatorSpec
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeout(2000L)
.expireGroupsUponCompletion(true)
.correlationStrategy(message -> KafkaHeaderUtils.getOrDefault(message.getHeaders(), MY_CORRELATION_HEADER, ""))
)
.transform(myObjectTransformer) // Exception here is not sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
.get();
}
If we add an explicit channel which is not of type DirectChannel the errorHandling is working as expected. Working code looks like:
// ...
.aggregate(aggregatorSpec -> ...)
.channel(MessageChannels.queue())
.transform(myObjectTransformer) // Now the exception is sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
// ...
Also we'd like to mention, that we have a very similar flow with an aggregation where errorHandling works as expected (Exception sent to errorChannel)
So we were actually able to get the code running, but since errorHandling is a very critical part of the application we'd really like to understand how we can ensure each error will be sent to the configured channel and why explicitly setting a QueueChannel leads to the wanted behavior.
Thanks in advance
You can add this
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel()))
before an aggregator.
The .channel(MessageChannels.queue())
is misleading over here because the error is sent to the global errorChannel
, which is apparently is the same as yours errorHandler.getInputChannel()
.
The problem that .groupTimeout(2000L)
is done on a separate TaskScheduler
thread and when an error happens downstream there is no knowledge about try..catch
in that Kafka.messageDrivenChannelAdapter
.
Feel free to raise a GH issue, so we will think about populating that errorChannel
into message headers from the MessageProducerSupport
, like that Kafka.messageDrivenChannelAdapter
. So, the error handling would be the same independently of the async nature of the downstream flow.
UPDATE
Please, try this as a solution:
.transform(Transformers.fromJson(MyDataObject.class)) // An exception here is sent to errorChannel
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel())))
.aggregate(aggregatorSpec ->
The enrichHeaders()
should do the trick to determine a proper error channel to send error.
Plus your MyDataObjectTransformer
has to be modified to this:
throw new MessageTransformationException(source, "test");
The point is that there is a logic like this when exception is caught by the endpoint:
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
where:
if (!(ex instanceof MessagingException) ||
((MessagingException) ex).getFailedMessage() == null) {
runtimeException = new MessageDeliveryException(message, text.get(), ex);
}
And then in the AbstractCorrelatingMessageHandler
:
catch (MessageDeliveryException ex) {
logger.warn(ex, () ->
"The MessageGroup [" + groupId +
"] is rescheduled by the reason of: ");
scheduleGroupToForceComplete(groupId);
}
That's how your exception does not reach the error channel.
You may consider to not use that MessageTransformationException
. The logic in the wrapping handler is like this:
protected Object handleRequestMessage(Message<?> message) {
try {
return this.transformer.transform(message);
}
catch (Exception e) {
if (e instanceof MessageTransformationException) { // NOSONAR
throw (MessageTransformationException) e;
}
throw new MessageTransformationException(message, "Failed to transform Message in " + this, e);
}
}
UPDATE 2
OK. I see that you use Spring Boot and that one does not register a respective ErrorHandler
to the TaskScheduler
used in the aggregator for group timeout feature.
Please, consider to add this bean into your configuration:
@Bean
TaskSchedulerCustomizer taskSchedulerCustomizer(ErrorHandler integrationMessagePublishingErrorHandler) {
return taskScheduler -> taskScheduler.setErrorHandler(integrationMessagePublishingErrorHandler);
}
And then feel free to raise a GH issue for Spring Boot to make this customization as a default one in the auto-configuration.