I have an issue with missing some messages after aggregation. I need to aggregate my messages into groups with same number of elements. For my current problem I have 118 elements in messageChannel
. These messages succesfully combines into 11 groups with 10 elements for each group. But last 8 have been lost
IntegrationFlows
.from(messageChannel)
.split(s -> s
.applySequence(false).get().getT2().setDelimiters("[\r\n]"))
.aggregate(s -> s
.correlationExpression("payload")
.releaseExpression("size() >= 10")
.expireGroupsUponCompletion(true)
)
.handle(h ->
System.out.println(h))
.get();
I expect receiving 8 lost messages into new group
Maybe expireGroupsUponTimeout
can help here:
IntegrationFlows
.from(messageChannel)
.split(s -> s
.applySequence(false).get().getT2().setDelimiters("[\r\n]"))
.aggregate(s -> s
.correlationExpression("payload")
.releaseExpression("size() >= 10")
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout( 500 )
)
.handle(h ->
System.out.println(h))
.get();