I have a spring flow where i am aggregating message. I am using time out to expire the group.
<int:aggregator input-channel="inputChannel" expire-groups-upon-completion="true" expire-groups-upon-timeout="true" discard-channel="timeoutChannel" group-timeout="10000" correlation-strategy-expression="headers['id']" output-channel="release"/>
in timeoutChannel i am sending email when any group is timeout. While seting email it fails( not able to connect to server that is fine) and send to error channel where i am logging the error.
I am not able figure out why its expiring message 2 times with same log message?
Error Log
2021-05-27 17:53:46,213 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:528 - Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=d02b06cb-1c59-2d87-a3d6-080de89799e4,
2021-05-27 17:53:46,215 INFO [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:632 - Expiring MessageGroup with correlationKey[d02b06cb-1c59-2d87-a3d6-080de89799e4]
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:643 - Discarding messages of partially complete group with key [d02b06cb-1c59-2d87-a3d6-080de89799e4] to: aggregatorTimeoutChannel
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutChannel',
2021-05-27 17:53:46,216 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutAlert',
2021-05-27 17:53:46,218 DEBUG [task-scheduler-1] org.springframework.integration.filter.MessageFilter:115 - timeOutAlertMailChain$child#0.handler received message: GenericMessage [
2021-05-27 17:53:46,219 DEBUG [task-scheduler-1] org.springframework.integration.handler.ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@18afd6e2] (timeOutAlertMailChain$child#1) received message:
2021-05-27 17:53:46,220 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#2.handler received message:
2021-05-27 17:53:46,221 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#3.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#4.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#5.handler received message:
2021-05-27 17:53:46,225 DEBUG [task-scheduler-1] org.springframework.integration.mail.MailSendingMessageHandler:115 - timeOutAlertMailChain$child#6.handler received message: GenericM
2021-05-27 17:53:48,238 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
2021-05-27 17:53:48,240 DEBUG [task-scheduler-1] org.springframework.integration.handler.MethodInvokingMessageHandler:115 - org.springframework.integration.handler.MethodInvokingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect, failedMessage=GenericMessage [payload=<?xml version="1.0" encoding="UTF-8"?><html xmlns="http://www.w3.org/TR/REC-html40" xmlns:o="urn:schemas-microsoft-com:office:office"><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"/><meta name="Generator" content="Microsoft Word 14 (filtered medium)"/></head><body lang="EN-GB" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span lang="EN-US" style="mso-fareast-language: EN-IE">
2021-05-27 17:53:48,240 ERROR [task-scheduler-1] com.examplet.ErrorHandler:26 - ERROR!!!
error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect
2021-05-27 17:53:48,241 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:432 - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
You probably need to think about regular output emitting when expiration timeout happens. As you notices the send-partial-result-on-expiry="true"
does the trick on the matter.
You probably need to think do not make expire-groups-upon-timeout="true"
, but rather false
. This way the expired group is not going to be removed from the store and all the late messages are going to be discarded since group is marked as complete
.
I don't think there is a point for you to look into that <int:expire-advice-chain>
since only one message per group is going to be emitted to the outputChannel
according that our send-partial-result-on-expiry="true"
.
See more docs about an aggregator and what does it do: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
UPDATE
If you wish to use an <int:expire-advice-chain>
, you need to implement a MethodInterceptor
like this:
public class ExpiredGroupNotificationMethodInterceptor implements MethodInterceptor {
@Nullable
@Override
public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
MessageGroup expiredGroup = (MessageGroup) invocation.getArguments()[0];
Collection<Message<?>> partiallyReleasedMessages = expiredGroup.getMessages();
// Do some notification here
return invocation.proceed();
}
}
And add it as a <bean>
for that chain:
<expire-advice-chain>
<beans:bean class="ExpiredGroupNotificationMethodInterceptor"/>
</expire-advice-chain>
As a nested configuration for your <aggregator>
tag.