Search code examples
javaspringspring-integrationspring-integration-dsl

Missing last group after aggregation


I have an issue with missing some messages after aggregation. I need to aggregate my messages into groups with same number of elements. For my current problem I have 118 elements in messageChannel. These messages succesfully combines into 11 groups with 10 elements for each group. But last 8 have been lost

IntegrationFlows
                .from(messageChannel)
                .split(s -> s
                        .applySequence(false).get().getT2().setDelimiters("[\r\n]"))
                .aggregate(s -> s
                                .correlationExpression("payload")
                                .releaseExpression("size() >= 10")
                                .expireGroupsUponCompletion(true)
                )
                .handle(h ->
                        System.out.println(h))

                .get();

I expect receiving 8 lost messages into new group


Solution

  • Maybe expireGroupsUponTimeout can help here:

    IntegrationFlows
                .from(messageChannel)
                .split(s -> s
                        .applySequence(false).get().getT2().setDelimiters("[\r\n]"))
                .aggregate(s -> s
                                .correlationExpression("payload")
                                .releaseExpression("size() >= 10")
                                .expireGroupsUponCompletion(true)
                                .expireGroupsUponTimeout( 500 )
                )
                .handle(h ->
                        System.out.println(h))
    
                .get();