Search code examples
springspring-integrationspring-integration-dsl

How to aggrate messages from a queue Channel with using spring integration DSL?


i define a queue channel

@Bean("mail-action-laundry-list-channel")
public MessageChannel mailRecipientActionMessageChannel() {
    return new QueueChannel(20);
    }

the flow below, i will aggrate messages from the queue channel, i tried this:

@Bean
public IntegrationFlow mailRecipientActionLaundryListMessageFlow(@Qualifier("laundryListMessageHandler") MessageHandler laundryListMessageHandler) {
    return IntegrationFlows.from("mail-action-laundry-list-channel")
            .log("--> laundry list messages::")
            .aggregate(aggregatorSpec -> aggregatorSpec
                    .correlationExpression("#this.payload.email")
                    .releaseExpression("#this.size() == 5")
                    .messageStore(new SimpleMessageStore(100))
                    .groupTimeout(2000))
            .transform(laundryListMessageToItemProcessDtoTransformer())
            .handle(laundryListMessageHandler)
            .get();
}

but why it aggrate first 5 messages from the channel always, and aggrate other message no longer


Solution

  • You need to configure expireGroupsUponCompletion(true) on the aggregator:

    When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.

    Looks like your subsequent messages from the queue has the same email property. Therefore an aggregator can't form a new group for the same correlation key.

    https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator-config