Search code examples
spring-integrationspring-integration-dsl

Reactive DSL Error Handling with Flux Channels


I have a partially reactive flow that reads from SQS, performs some logic, saves to DB (R2DBC). The flow runs on a reactive channel which is the inbound channel for SqsMessageDrivenChannelAdapter.

The Issue:

Exceptions thrown in the handle method (.handle((payload, header) -> validator.validate((Dto) payload)) ) do not reach the flowErrorChannel. The errorProcessingFlow is not triggered, I need the errorProcessingFlow to log and throw the exception to SimpleMessageListenerContainer.

The same works as expected if I change the objectChannel and flowErrorChannel from flux to direct, but not with Flux channels. With flux channels the exception is not even propagated to SimpleMessageListenerContainer since it does not trigger a redrive as per SQS queue config, the exception from the handle is only logged.

Here is the exception and the flow config:

2021-05-28 12:40:34.772 ERROR 59097 --- [enerContainer-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [*********]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:280)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:261)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:199)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:79)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:68)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter.access$400(SqsMessageDrivenChannelAdapter.java:60)
    at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsMessageDrivenChannelAdapter.java:194)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: javax.validation.ConstraintViolationException: XXXXXX
    at validator.validate(Validator.java:33)


 @Bean
    public IntegrationFlow validationAndProcessingFlow() {
        return IntegrationFlows.from(objectChannel())
                .log(
                        Level.INFO,
                        message -> "Object Channel Received Message : " + message.getPayload())
                .transform(Transformers.fromJson(Dto.class))
                .handle((payload, header) -> validator.validate((Dto) payload))
                .handle((payload, header) -> mapper.toMyObject(Dto) payload))
                .handle((payload, header) -> service.process((MyObject) payload))
                .handle(
                        (payload, header) ->
                                adapter
                                        .save((MyObject) payload)
                                        .as(create(transactionManager)::transactional))
                .log(
                        Level.INFO,
                        message -> "Persisted Message : " + message.getPayload())
                .get();
    }


    @Bean
    public MessageProducer createSqsMessageDrivenChannelAdapter(
            @Value("${queue.inbound.name}") final String queueName,
            @Value("${queue.inbound.visibility-timeout}") final Integer visibilityTimeout,
            @Value("${queue.inbound.wait-timeout}") final Integer waitTimeout,
            @Value("${queue.inbound.max-number-of-messages}")
                    final Integer maxNumberMessages) {
        SqsMessageDrivenChannelAdapter adapter =
                new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
        adapter.setVisibilityTimeout(visibilityTimeout);
        adapter.setWaitTimeOut(waitTimeout);
        adapter.setAutoStartup(true);
        adapter.setMaxNumberOfMessages(maxNumberMessages);
        adapter.setErrorChannel(flowErrorChannel());
        adapter.setOutputChannel(objectChannel());
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NO_REDRIVE);
        return adapter;
    }

    @Bean
    public IntegrationFlow errorProcessingFlow() {

    return IntegrationFlows.from(flowErrorChannel())
        .log(Level.ERROR)
        .handle(
            m -> {
              throw (RuntimeException) (m.getPayload());
            })
        .get();
    }

    @Bean
    public MessageChannel objectChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel flowErrorChannel() {
        return MessageChannels.flux().get();
    }


Solution

  • The behavior is expected: as long as we switch to async handling, we lose a try..catch feature in the source of messages.

    Please, learn more about error handling in Reactor: https://projectreactor.io/docs/core/release/reference/#error.handling.

    Until we can come up with some reactive solution for SQS, there is no way to handle your problem unless you turn that channel back to direct.