Search code examples
spring-bootspring-integrationspring-amqpspring-integration-dsl

Spring Integration aggregator only releases one group coming from AMQP backed channel


I'm having an issue with my Spring Boot app where the only one group is processed in my aggregator and then the app stops consuming more messages from the queue. It only processes a group at startup it seems. I restarted the app and it processed another group, but then it just stopped again.

This is my flow below.

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
                .split(userImportSplitter)
                .channel(Amqp.channel(connectionFactory)
                        .queueName(USER_QUEUE_NAME)
                        .prefetchCount(batchSize))
                .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500))
                .handle(userImporter)
                .get();

Solution

  • Probably your userImportSplitter produces the same conrrelationId, therefore only one group is formed in the aggregator and by default it is not removed after release or timeout.

    Consider to use these options:

    .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500)
                        .expireGroupsUponCompletion(true)
                        .expireGroupsUponTimeout(true))