I have a partially reactive flow that reads from SQS, performs some logic, saves to DB (R2DBC). The flow runs on a reactive channel which is the inbound channel for SqsMessageDrivenChannelAdapter
.
The Issue:
Exceptions thrown in the handle
method (.handle((payload, header) -> validator.validate((Dto) payload))
) do not reach the flowErrorChannel
. The errorProcessingFlow
is not triggered, I need the errorProcessingFlow to log and throw the exception to SimpleMessageListenerContainer
.
The same works as expected if I change the objectChannel
and flowErrorChannel
from flux
to direct
, but not with Flux channels. With flux channels the exception is not even propagated to SimpleMessageListenerContainer
since it does not trigger a redrive as per SQS queue config, the exception from the handle
is only logged.
Here is the exception and the flow config:
2021-05-28 12:40:34.772 ERROR 59097 --- [enerContainer-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [*********]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:280)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:261)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:199)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:79)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:68)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter.access$400(SqsMessageDrivenChannelAdapter.java:60)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsMessageDrivenChannelAdapter.java:194)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: javax.validation.ConstraintViolationException: XXXXXX
at validator.validate(Validator.java:33)
@Bean
public IntegrationFlow validationAndProcessingFlow() {
return IntegrationFlows.from(objectChannel())
.log(
Level.INFO,
message -> "Object Channel Received Message : " + message.getPayload())
.transform(Transformers.fromJson(Dto.class))
.handle((payload, header) -> validator.validate((Dto) payload))
.handle((payload, header) -> mapper.toMyObject(Dto) payload))
.handle((payload, header) -> service.process((MyObject) payload))
.handle(
(payload, header) ->
adapter
.save((MyObject) payload)
.as(create(transactionManager)::transactional))
.log(
Level.INFO,
message -> "Persisted Message : " + message.getPayload())
.get();
}
@Bean
public MessageProducer createSqsMessageDrivenChannelAdapter(
@Value("${queue.inbound.name}") final String queueName,
@Value("${queue.inbound.visibility-timeout}") final Integer visibilityTimeout,
@Value("${queue.inbound.wait-timeout}") final Integer waitTimeout,
@Value("${queue.inbound.max-number-of-messages}")
final Integer maxNumberMessages) {
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
adapter.setVisibilityTimeout(visibilityTimeout);
adapter.setWaitTimeOut(waitTimeout);
adapter.setAutoStartup(true);
adapter.setMaxNumberOfMessages(maxNumberMessages);
adapter.setErrorChannel(flowErrorChannel());
adapter.setOutputChannel(objectChannel());
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NO_REDRIVE);
return adapter;
}
@Bean
public IntegrationFlow errorProcessingFlow() {
return IntegrationFlows.from(flowErrorChannel())
.log(Level.ERROR)
.handle(
m -> {
throw (RuntimeException) (m.getPayload());
})
.get();
}
@Bean
public MessageChannel objectChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel flowErrorChannel() {
return MessageChannels.flux().get();
}
The behavior is expected: as long as we switch to async handling, we lose a try..catch
feature in the source of messages.
Please, learn more about error handling in Reactor: https://projectreactor.io/docs/core/release/reference/#error.handling.
Until we can come up with some reactive solution for SQS, there is no way to handle your problem unless you turn that channel back to direct.