Search code examples
springspring-integrationaggregator

Aggregator behavior on server restart - spring integration


Premise -

In spring integration,if i have a aggregator with a message group which is incomplete. Before group release stratergy is met, server is restarted.

  • Current Behavior-> all the messages posted to the aggregator go to the same message group and not a new one, since it is not marked complete, messages keep flowing in.
  • Expected-> If server is restarted, aggregator picks the left over messages from message store, marks already persisted ones complete & then cater new ones,

Is my expectation incorrect? Can somebody guide?


Solution

  • I think we can reach your requirements with MessageGroupStoreReaper, which you will run just on the server startup, e.g. via catching ContextRefreshedEvent:

    The MessageGroupStore maintains a list of these callbacks which it applies, on demand, to all messages whose timestamp is earlier than a time supplied as a parameter (see the registerMessageGroupExpiryCallback(..) and expireMessageGroups(..) methods above).

    The expireMessageGroups method can be called with a timeout value. Any message older than the current time minus this value will be expired, and have the callbacks applied. Thus it is the user of the store that defines what is meant by message group "expiry".

    http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#reaper