Search code examples
springspring-integrationspring-integration-dsl

Spring integration aggregate messages every 10 seconds


In one of my flows I'm looking at aggregating every 10 seconds, and writing those payloads to a fileshare. I'm not very clear about using the aggregator.

@Bean
public IntegrationFlow errorHandlingQueueFlow() {
    return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
            .bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))                
            .aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
            .transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
            .handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
            .get();
}

As this is for error handling, Only a few messages which errored out would be coming into this ERROR_QUEUE_CHANNEL. So I would like to collect it every 10 seconds, and not wait for all the messages from a group to be received. When I use grouptimeout, every 10 seconds all messages which have been collected are sent to nullchannel.


Solution

  • The default purpose of groupTimeout() is to clean up groups which are expired. If you'd like to release them normally instead of discarding, you should consider to use a sendPartialResultOnExpiry = true. This all, of course, makes sense if you really have correlation details headers in those messages. Otherwise you need to think about correlationStrategy to make those error messages to be grouped.

    Please, read more about an aggregator and its options in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator