I use Spring Integration aggregator pattern and storing in JdbcMessageStore
.
I know that .expireGroupsUponCompletion(true)
and .expireGroupsUponTimeout(true)
remove a group from the store.
I would like to know if both these properties are false
, how can I remove groups additionally at the scheduled time, such as at end of day. Is it possible?
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from(aggregatorSessionChannel())
.aggregate(g -> g.poller(Pollers.fixedDelay(1000).errorChannel("error.input"))
.transactional(true)
.discardChannel("discardSession.input")
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("payload.item.ticket")
.releaseStrategy(r -> r.size() == 2)
.outputProcessor(outputSessionProcessor()))
...
}
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
@Service
@RequiredArgsConstructor
public class SchedulerReaperService {
private final MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
messageGroupStoreSessionReaper.run();
}
}
2022-04-04 12:07:38.377 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.378 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.382 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.383 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.385 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]
2022-04-04 12:07:41.455 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:43.477 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:45.497 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:47.517 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
See MessageGroupStoreReaper
and how it can be configured for scheduled task, e.g. by cron - in the end of the day: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.
There is a sample in XML, but Java configuration is not too much difference:
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
See @EnableScheduling
and @Scheduled
for Java.
UPDATE
The logic over there is the aggregator is like this:
if (groupSize > 0) {
noOutput = false;
if (this.releaseStrategy.canRelease(groupNow)) {
completeGroup(correlationKey, groupNow, lock);
}
else {
expireGroup(correlationKey, groupNow, lock);
}
if (!this.expireGroupsUponTimeout) {
afterRelease(groupNow, groupNow.getMessages(), true);
removeGroup = false;
}
}
So, if your group is not empty (e.g. 1 message), it cannot be released normally. But it is expired and then just released messages are removed from the group. The next time when reaper is working, it is going to next condition:
/*
* By default empty groups are removed on the same schedule as non-empty
* groups. A longer timeout for empty groups can be enabled by
* setting minimumTimeoutForEmptyGroups.
*/
removeGroup =
lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
And now the empty group is removed.
Please, revise in the debug mode that your reaperTimeout
is really sufficient to determine that your groups are old enough for expiration for the first and the next reaper cycles.
UPDATE 2
Here is a simple working example:
@SpringBootApplication
@EnableScheduling
public class So71617406Application {
public static void main(String[] args) {
SpringApplication.run(So71617406Application.class, args);
}
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
@Autowired
@Lazy
private MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
this.messageGroupStoreSessionReaper.run();
}
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from("aggregatorSessionChannel")
.aggregate(g -> g
.transactional(true)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("1")
.releaseStrategy(r -> r.size() == 2))
.nullChannel();
}
@Bean
ApplicationRunner applicationRunner(MessageChannel aggregatorSessionChannel) {
return args -> {
aggregatorSessionChannel.send(new GenericMessage<>("test1"));
aggregatorSessionChannel.send(new GenericMessage<>("test2"));
};
}
}
The logs for it looks like:
2022-04-04 11:10:40.149 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.stackoverflow.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.150 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.166 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4
2022-04-04 11:10:40.174 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4 and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.175 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Creating message group with id key=c4ca4238-a0b9-3382-8dcc-509a6f75849b and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.176 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.stackoverflow.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.stackoverflow.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.stackoverflow.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.184 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.186 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [1]
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of messages for messageGroup: 1
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.integration.channel.NullChannel : message sent to null channel: GenericMessage [payload=[test1, test2], headers={id=5442d9a6-3549-041c-7f49-557b3df62749, timestamp=1649085040191}]
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Completing MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Removing messages from group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.stackoverflow.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:41.159 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:43.156 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.147 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.148 DEBUG 2760 --- [ scheduling-1] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.a.AggregatingMessageHandler : Removing empty group: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Removing relationships for the group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.150 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Deleting messages with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
Pay attention to the Removing empty group:
. So, it looks like your group is not empty for some reason after that group completion. Perhaps your outputSessionProcessor
does some mistakes, so it throws an error and therefore completed messages are not removed from DB because of transaction rollback...