Search code examples
javaspringspring-integration

Message Group expiring 2 times


I have a spring flow where i am aggregating message. I am using time out to expire the group.

<int:aggregator input-channel="inputChannel" expire-groups-upon-completion="true" expire-groups-upon-timeout="true" discard-channel="timeoutChannel"  group-timeout="10000" correlation-strategy-expression="headers['id']" output-channel="release"/> 

in timeoutChannel i am sending email when any group is timeout. While seting email it fails( not able to connect to server that is fine) and send to error channel where i am logging the error.

I am not able figure out why its expiring message 2 times with same log message?

Error Log

2021-05-27 17:53:46,213 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:528 - Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=d02b06cb-1c59-2d87-a3d6-080de89799e4, 
2021-05-27 17:53:46,215 INFO  [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:632 - Expiring MessageGroup with correlationKey[d02b06cb-1c59-2d87-a3d6-080de89799e4] 
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:643 - Discarding messages of partially complete group with key [d02b06cb-1c59-2d87-a3d6-080de89799e4] to: aggregatorTimeoutChannel 
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutChannel', 
2021-05-27 17:53:46,216 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutAlert',
2021-05-27 17:53:46,218 DEBUG [task-scheduler-1] org.springframework.integration.filter.MessageFilter:115 - timeOutAlertMailChain$child#0.handler received message: GenericMessage [
2021-05-27 17:53:46,219 DEBUG [task-scheduler-1] org.springframework.integration.handler.ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@18afd6e2] (timeOutAlertMailChain$child#1) received message:
2021-05-27 17:53:46,220 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#2.handler received message:
2021-05-27 17:53:46,221 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#3.handler received message: 
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#4.handler received message: 
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#5.handler received message: 
2021-05-27 17:53:46,225 DEBUG [task-scheduler-1] org.springframework.integration.mail.MailSendingMessageHandler:115 - timeOutAlertMailChain$child#6.handler received message: GenericM
2021-05-27 17:53:48,238 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    2021-05-27 17:53:48,240 DEBUG [task-scheduler-1] org.springframework.integration.handler.MethodInvokingMessageHandler:115 - org.springframework.integration.handler.MethodInvokingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect, failedMessage=GenericMessage [payload=<?xml version="1.0" encoding="UTF-8"?><html xmlns="http://www.w3.org/TR/REC-html40" xmlns:o="urn:schemas-microsoft-com:office:office"><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"/><meta name="Generator" content="Microsoft Word 14 (filtered medium)"/></head><body lang="EN-GB" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span lang="EN-US" style="mso-fareast-language: EN-IE">
2021-05-27 17:53:48,240 ERROR [task-scheduler-1] com.examplet.ErrorHandler:26 - ERROR!!! 
error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect 
2021-05-27 17:53:48,241 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:432 - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:
    java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
  nested exception is:

Solution

  • You probably need to think about regular output emitting when expiration timeout happens. As you notices the send-partial-result-on-expiry="true" does the trick on the matter.

    You probably need to think do not make expire-groups-upon-timeout="true", but rather false. This way the expired group is not going to be removed from the store and all the late messages are going to be discarded since group is marked as complete.

    I don't think there is a point for you to look into that <int:expire-advice-chain> since only one message per group is going to be emitted to the outputChannel according that our send-partial-result-on-expiry="true".

    See more docs about an aggregator and what does it do: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

    UPDATE

    If you wish to use an <int:expire-advice-chain>, you need to implement a MethodInterceptor like this:

    public class ExpiredGroupNotificationMethodInterceptor implements MethodInterceptor {
    
        @Nullable 
        @Override 
        public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
            MessageGroup expiredGroup = (MessageGroup) invocation.getArguments()[0];
            Collection<Message<?>> partiallyReleasedMessages = expiredGroup.getMessages();
            // Do some notification here
            return invocation.proceed();
        }
    
    }
    

    And add it as a <bean> for that chain:

    <expire-advice-chain>
        <beans:bean class="ExpiredGroupNotificationMethodInterceptor"/>
    </expire-advice-chain>
    

    As a nested configuration for your <aggregator> tag.