Search code examples
spring-integrationspring-jmsspring-retry

Spring Retry Recovery attached to a service activator is not part of the processing chain


I am using a spring integration chain of service activators to process an incoming message from a queue. One of the service activators messagePersister that persists the incoming message. If this service activator fails, there is a retry advice that tries the operation 3 more times. This part works fine, but if all retries fail, we have a recovery method that persists the message in alternate form (also triggers some notifications etc). This recovery method and the orginal persister method return objects of the same class that then need to be processed by the preprocessor - the next service activator in the chain. However, it looks like, using the recovery options causes the message to leave the chain, and the return object from the recovery service activator does not go down the chain. Similarly, if the recovery method throws an exception, it does not go to the redboxExceptionChannel which is the exception for the adapter that is listening to the incoming queue.

 <int-jms:message-driven-channel-adapter
    id="inputChannelAdapter"
    connection-factory="jmsConnectionFactory"
    destination-name="REDBOX_IN"
    channel="redboxIncomingChannel"
    max-concurrent-consumers="1"
    auto-startup="true"
    acknowledge="transacted"
    receive-timeout="20000"
    error-channel="redboxExceptionChannel"/>

  <int:chain id="redboxIncomingChannelProcessingChain"
    input-channel="redboxIncomingChannel"
    output-channel="redboxOutgoingMessageChannel">
    <int:service-activator ref="messagePersister"
      method="persistAndAddClientMessageIdToHeader">
      <int:request-handler-advice-chain>
        <int:retry-advice max-attempts="4" recovery-channel="persistenceRetriesExhaustedChannel" >
          <int:exponential-back-off initial="800"
            multiplier="3"
            maximum="25000"/>
        </int:retry-advice>
      </int:request-handler-advice-chain>
    </int:service-activator>
    <int:service-activator ref="redboxPreProcessor" method="validate"/>
    <int:service-activator ref="redboxProcessor" method="evaluateRules"/>
  </int:chain>

  <int:service-activator ref="messagePersister"
    method="retriesExhausted" input-channel="persistenceRetriesExhaustedChannel"  />

I was expecting the recovery method to be part of the chain that triggered the retries.


Solution

  • The behavior is correct. The ErrorMessageSendingRecoverer has the logic like this:

    @Override
    public Object recover(RetryContext context) {
        publish(context.getLastThrowable(), context);
        return null;
    }
    

    So, it just does not return. There is no knowledge at that point that your service activator is reply producing.

    You can fix the problem this way:

    add a <gateway> at that point and extract your service-activator with retry into an independent component with an input-channel as the request-channel from the mentioned gateway.

    Then your messagePersister.retriesExhausted must look into a MessagingException.failedMessage to copy its headers before returning from this method. This way the replyChannel will be present and endpoint would know where to send a result of your method. This replyChannel is where that gateway is waiting for reply. So, you got a normal reply from original service activator and compensation one from persistenceRetriesExhaustedChannel subscriber.

    UPDATE

    Regarding errors from the recoverer sub-flow. According to my testing it works as expected:

    @SpringBootApplication
    public class So78089892Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So78089892Application.class, args);
        }
    
        @Bean
        ApplicationRunner sendToJms(JmsTemplate jmsTemplate) {
            return args -> jmsTemplate.convertAndSend("REDBOX_IN", "test data");
        }
    
        @Bean
        JmsMessageDrivenChannelAdapterSpec<?> inputChannelAdapter(ConnectionFactory jmsConnectionFactory) {
            return Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                    .destination("REDBOX_IN")
                    .outputChannel("redboxIncomingChannel")
                    .errorChannel("redboxExceptionChannel");
        }
    
        @ServiceActivator(inputChannel = "redboxExceptionChannel")
        void handleErrors(Exception exception) {
            System.out.println("Error Received: \n" + exception);
        }
    
        @Bean
        IntegrationFlow redboxIncomingChannelProcessingChain(RequestHandlerRetryAdvice retryAdvice) {
            return IntegrationFlow
                    .from("redboxIncomingChannel")
                    .gateway((subFlow) -> subFlow
                            .handle((p, h) -> {
                                throw new RuntimeException("Persistence failed");
                            }), e -> e.advice(retryAdvice))
                    .get();
        }
    
        @Bean
        RequestHandlerRetryAdvice retryAdvice(MessageChannel persistenceRetriesExhaustedChannel) {
            RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
            requestHandlerRetryAdvice.setRecoveryCallback(
                    new ErrorMessageSendingRecoverer(persistenceRetriesExhaustedChannel));
            return requestHandlerRetryAdvice;
    
        }
    
        @Bean
        DirectChannel persistenceRetriesExhaustedChannel() {
            return new DirectChannel();
        }
    
        @ServiceActivator(inputChannel = "persistenceRetriesExhaustedChannel")
        void retriesExhausted(Exception exception) {
            throw new RuntimeException("Cannot recover", exception);
        }
    
    }
    

    As you see in the last retriesExhausted() method I deliberately throw some exception based on the one coming from just failed handler with retry advice.

    In the end I got logs from that handleErrors() method like this:

    Error Received: 
    org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@53b41cc8], failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle, failedMessage=GenericMessage [payload=test data, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=ActiveMQQueue[REDBOX_IN], id=5ff5cbc6-f585-c113-a9c3-40a741d0cc7f, priority=4, jms_timestamp=1709326985025, jms_messageId=ID:18fa9c02-d80f-11ee-8409-00155d933a76, timestamp=1709326985042}], headers={id=9eb9a300-d058-da1c-8315-432c2ae0cb34, timestamp=1709326985055}]
    

    (Sorry for Java DSL variant: I haven't worked with XML config for a while).

    We might have some difference in the configuration. For example, your persistenceRetriesExhaustedChannel is not a DirectChannel...