Search code examples
javaspringspring-integrationsolace

Spring - How to read multiple messages off the queue and persist them in bulk


I'm trying to improve the performance of the system built with (Spring Integration + Solace messaging system) by reading the messages off the queue in bulk (let's say 50 at a time) and persisting them into the DB (in bulk). When I use a Spring Integration poller in inbound-channel-adapter with the below config:

<int:poller
    messages-per-poll="100"
    rate="1000"
/>

I have 2 issues:

  1. In ServiceActivator I will still need to receive one message at a time (ServiceActivator method doesn't allow method declaration with List> parameter

  2. Even by reading 1 message at a time I can't collect them into a list (by 50) to persist in bulk, because there might be only 40 messages on the queue and I would need to (infinitely) wait for another 10 to persist.

I tried to solve that with com.solacesystems.jcsmp.FlowReceiver, it has a drawback that I can't receive the message headers.

What can you suggest?


Solution

  • The Aggregator EIP is for you. It can be configured to group messages to 50 let's say by the Thread.currentThread.getId() as correlation key. And also it can release groups by the fact "empty" message via AbstractMessageSourceAdvice.afterReceive():

    /**
     * Subclasses can take actions based on the result of the poll; e.g.
     * adjust the {@code trigger}. The message can also be replaced with a new one.
     * @param result the received message.
     * @param source the message source.
     * @return a message to continue to process the result, null to discard whatever the poll returned.
     */
    public abstract Message<?> afterReceive(Message<?> result, MessageSource<?> source);
    

    So, when result is null, you should trigger MessageGroupStoreReaper to expire non-50 groups in the aggregator.

    See Reference Manual.