Search code examples
spring-integrationspring-integration-dsl

Configured errorChannel not called after aggregation


We are facing a strange behavior in our integration flows where the errorChannel does not receive a message in case an exception is thrown in a step after an aggregation.

This is the (reduced) flow:

    @Bean
    public StandardIntegrationFlow startKafkaInbound() {
        return IntegrationFlows.from(Kafka
                .messageDrivenChannelAdapter(
                        kafkaConsumerFactory,
                        ListenerMode.record,
                        serviceProperties.getInputTopic().getName())
                .errorChannel(errorHandler.getInputChannel())
        )
                .channel(nextChannel().getInputChannel())
                .get();
    }

@Bean
    public IntegrationFlow nextChannel() {
        return IntegrationFlows.from("next")
                .transform(Transformers.fromJson(MyObject.class))  // An exception here is sent to errorChannel
                .aggregate(aggregatorSpec ->
                        aggregatorSpec
                                .releaseStrategy(new MessageCountReleaseStrategy(100))
                                .sendPartialResultOnExpiry(true)
                                .groupTimeout(2000L)
                                .expireGroupsUponCompletion(true)
                                .correlationStrategy(message -> KafkaHeaderUtils.getOrDefault(message.getHeaders(), MY_CORRELATION_HEADER, ""))
                )
                .transform(myObjectTransformer)  // Exception here is not sent to errorChannel                 
                .channel(acknowledgeMyObjectFlow().getInputChannel())
                .get();
    }

If we add an explicit channel which is not of type DirectChannel the errorHandling is working as expected. Working code looks like:

// ...
.aggregate(aggregatorSpec -> ...)
.channel(MessageChannels.queue())
.transform(myObjectTransformer)  // Now the exception is sent to errorChannel                 
.channel(acknowledgeMyObjectFlow().getInputChannel())
// ...

Also we'd like to mention, that we have a very similar flow with an aggregation where errorHandling works as expected (Exception sent to errorChannel)

So we were actually able to get the code running, but since errorHandling is a very critical part of the application we'd really like to understand how we can ensure each error will be sent to the configured channel and why explicitly setting a QueueChannel leads to the wanted behavior.

Thanks in advance


Solution

  • You can add this

    .enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel()))
    

    before an aggregator.

    The .channel(MessageChannels.queue()) is misleading over here because the error is sent to the global errorChannel, which is apparently is the same as yours errorHandler.getInputChannel().

    The problem that .groupTimeout(2000L) is done on a separate TaskScheduler thread and when an error happens downstream there is no knowledge about try..catch in that Kafka.messageDrivenChannelAdapter.

    Feel free to raise a GH issue, so we will think about populating that errorChannel into message headers from the MessageProducerSupport, like that Kafka.messageDrivenChannelAdapter. So, the error handling would be the same independently of the async nature of the downstream flow.

    UPDATE

    Please, try this as a solution:

    .transform(Transformers.fromJson(MyDataObject.class))  // An exception here is sent to errorChannel
    .enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel())))
    .aggregate(aggregatorSpec ->
    

    The enrichHeaders() should do the trick to determine a proper error channel to send error.

    Plus your MyDataObjectTransformer has to be modified to this:

       throw new MessageTransformationException(source, "test");
    

    The point is that there is a logic like this when exception is caught by the endpoint:

    if (handler != null) {
            try {
                handler.handleMessage(message);
                return true;
            }
            catch (Exception e) {
                throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
                        () -> "Dispatcher failed to deliver Message", e);
            }
        }
    

    where:

    if (!(ex instanceof MessagingException) ||
                ((MessagingException) ex).getFailedMessage() == null) {
            runtimeException = new MessageDeliveryException(message, text.get(), ex);
        }
    

    And then in the AbstractCorrelatingMessageHandler:

    catch (MessageDeliveryException ex) {
                                            logger.warn(ex, () ->
                                                    "The MessageGroup [" + groupId +
                                                            "] is rescheduled by the reason of: ");
                                        scheduleGroupToForceComplete(groupId);
                                    }
    

    That's how your exception does not reach the error channel. You may consider to not use that MessageTransformationException. The logic in the wrapping handler is like this:

    protected Object handleRequestMessage(Message<?> message) {
        try {
            return this.transformer.transform(message);
        }
        catch (Exception e) {
            if (e instanceof MessageTransformationException) { // NOSONAR
                throw (MessageTransformationException) e;
            }
            throw new MessageTransformationException(message, "Failed to transform Message in " + this, e);
        }
    }
    

    UPDATE 2

    OK. I see that you use Spring Boot and that one does not register a respective ErrorHandler to the TaskScheduler used in the aggregator for group timeout feature.

    Please, consider to add this bean into your configuration:

    @Bean
    TaskSchedulerCustomizer taskSchedulerCustomizer(ErrorHandler integrationMessagePublishingErrorHandler) {
        return taskScheduler -> taskScheduler.setErrorHandler(integrationMessagePublishingErrorHandler);
    }
    

    And then feel free to raise a GH issue for Spring Boot to make this customization as a default one in the auto-configuration.