Search code examples
javaspringspring-integrationspring-integration-dsl

Spring integration MessagePublishingErrorHandler not being invoked


I am trying to publish my exceptions to a specific error channel. I use a MessagePublishingErrorHandler for the same.

But my errors are not being routed to the channel I create.

Here is my code:-

channel:-

@Bean(value = "appErrorChannel")
    public PublishSubscribeChannel appErrorChannel() {
        return new PublishSubscribeChannel();
    }

MessageHandler:

@Bean
    public MessagePublishingErrorHandler myMessagePublishingErrorHandler(@Qualifier("appErrorChannel") PublishSubscribeChannel errChannel) {
        MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
        messagePublishingErrorHandler.setDefaultErrorChannel(errChannel);
        return messagePublishingErrorHandler;
    }

FLow from this channel:-

@Bean
public IntegrationFlow errorFlow(@Qualifier("appErrorChannel") PublishSubscribeChannel errChannel) {
    return flow -> flow.channel(errChannel).
            publishSubscribeChannel(spec -> spec
                    .subscribe(f1 -> f1.handle(m -> System.out.println("******************* "+ m.getPayload()))));
}

My original channel. The act method throws an exception:-

@Bean
    public IntegrationFlow pubSubFlow(PublishSubscribeChannel publishSubscribeChannel, 
                                      @Qualifier("myMessagePublishingErrorHandler")
            MessagePublishingErrorHandler messagePublishingErrorHandler) {
        return flow -> flow.channel(publishSubscribeChannel).split()
                .channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(1)))
                .publishSubscribeChannel(config -> config
                .subscribe(f1 -> f1.handle("action", "act")
                        .handle(m1 -> {System.out.println(">>>"+m1);}))
                        .errorHandler(messagePublishingErrorHandler)
                );
    }

Solution

  • The problem that errorHandler is used only with the taskExecutor in the PublishSubscribeChannel. Otherwise it is just ignored. We probably need to WARN on the matter in during PublishSubscribeChannel initialization. Feel free to raise a JIRA on the matter.

    As a workaround you can do something like this:

    .publishSubscribeChannel(new SyncTaskExecutor(), config -> config
    

    This way you still in the same calling thread, but the Runnable.run() is going to be really wrapped to the try...catch() and an ErrorMessage is going to be published to your appErrorChannel.