I'm trying to improve the performance of the system built with (Spring Integration + Solace messaging system) by reading the messages off the queue in bulk (let's say 50 at a time) and persisting them into the DB (in bulk). When I use a Spring Integration poller in inbound-channel-adapter with the below config:
<int:poller messages-per-poll="100" rate="1000" />
I have 2 issues:
In ServiceActivator I will still need to receive one message at a time (ServiceActivator method doesn't allow method declaration with List> parameter
Even by reading 1 message at a time I can't collect them into a list (by 50) to persist in bulk, because there might be only 40 messages on the queue and I would need to (infinitely) wait for another 10 to persist.
I tried to solve that with com.solacesystems.jcsmp.FlowReceiver, it has a drawback that I can't receive the message headers.
What can you suggest?
The Aggregator
EIP is for you. It can be configured to group messages to 50
let's say by the Thread.currentThread.getId()
as correlation key. And also it can release groups by the fact "empty" message via AbstractMessageSourceAdvice.afterReceive()
:
/**
* Subclasses can take actions based on the result of the poll; e.g.
* adjust the {@code trigger}. The message can also be replaced with a new one.
* @param result the received message.
* @param source the message source.
* @return a message to continue to process the result, null to discard whatever the poll returned.
*/
public abstract Message<?> afterReceive(Message<?> result, MessageSource<?> source);
So, when result is null
, you should trigger MessageGroupStoreReaper
to expire non-50 groups in the aggregator.
See Reference Manual.