Search code examples
parallel-processingspring-integrationenterprise-integration

How to run parallel Aggregators in Spring Integration?


I'd like to run a Spring Integration flow where I can easily scale up instances of components if capacity of any is reached.

In particular, I wonder how to scale Aggregators in a following scenario: Various components right before the Aggregator layer produce different parts of objects of class X - let's say they produce parts of two such objects X1, X2 - parts are called {a1, b1} and {a2, b2}, respectively. The Aggregators should now construct X1 and X2 from their parts and send them on. Let's also assume there's two Aggregators, A1 and A2.

enter image description here

How can I set it up most easily so that it works as expected, i.e. X1 and X2 are created and sent off?

I see the following considerations:

  • A1 gets a1, A2 gets b1, and X1 can't be constructed without something extra here.
  • We want some load balancing, which was the reason for multiple Aggregators in the first place.
  • Extra Aggregators should be easily added if needed - static configuration of number of Aggregators is to be avoided.

I wonder if the following will work for me - this is based on Spring Integration docs, but I'm not sure if I got it all right.

  1. Set up a Redis Message Store, where parts of X1 and X2 will be stored before all are available.
  2. Share the Message Store between all the Aggregator instances. This is setting (6) in the Aggregator configuration.
  3. (As usual for an Aggregator) Tag the parts (a's and b's) of the same X with the same CORRELATION_ID. Write ReleaseStrategy based on both a and b having been received.
  4. Create a Redis Lock Registry and configure all Aggregators to use it. This is setting (20) in the Aggregator configuration.

Will this do? In particular:

  1. Can I share a MessageStore between multiple Aggregators?
  2. When sharing a MessageStore, if A1 writes a1 into it and A2 writes b1, will my ReleaseStrategy see that X1 is now ready to be assembled?
  3. Will only one Aggregator process the aggregation using the ReleaseStrategy and send off the assembled X, because I'm using locks?

Many thanks!


Solution

  • Yes it will work, but you don't need Redis, unless you need persistence or your aggregators are running on different boxes (but see below for that); you can share the same SimpleMessageStore between the aggregators. By default, each aggregator uses its own in-memory SimpleMessageStore.

    And, yes, locks (by correlationId) ensure only one aggregator will process a group at the same time.

    ... because I'm using locks ...

    The aggregator uses its own locks internally, you don't need to lock anything yourself. In fact, with a global LockRegistry (gemfire, redis, custom implementation), and a persistent message group store (redis etc), your aggregators can run on different JVMs.