Search code examples
javaspringspring-integrationspring-amqp

Spring Integration Aggregator Throttler


I have one message SomeMessage that looks like this:

class SomeMessage{
 id,
 title
}

Currently, I aggregate messages based on id. Messages are released after 10 seconds.

.aggregate(
            a ->
                a 
                .outputProcessor(messageProcessor())
                .messageStore(messageGroupStore())
                .correlationStrategy(correlationStrategy())
                .expireGroupsUponCompletion(true)
                .sendPartialResultOnExpiry(true)
                .groupTimeout(TimeUnit.SECONDS.toMillis(10)))
        .handle(amqpOutboundEndpoint)

What I need is a way to throttle messages based on title property. If title=="A", it should still wait 10 seconds for aggregation; If title=="B" it should wait 60 seconds for aggregation and it should not be immediately sent to amqpOutboundEndpoint but it should have some throttling (eg. 30 seconds between every message that has title=="B").

What would be the best way to do this? Is there something like throttling on AmqpOutboundEndpoint?

UPDATE

.groupTimeout(messageGroup -> {
                      if(anyMessageInGroupHasTitleB(messageGroup)){
                        return TimeUnit.SECONDS.toMillis(60);
                      }
                      else {
                        return TimeUnit.SECONDS.toMillis(10);
                      }
                    }))
        .route(
            (Function<SomeMessage, Boolean>) ec ->
            ec.getTitle().equals("B"),
        m -> m.subFlowMapping(true, sf ->
            sf.channel(channels -> channels.queue(1))
                .bridge(e -> e.poller(Pollers
                    .fixedDelay(60, TimeUnit.SECONDS)
                    .maxMessagesPerPoll(1)
                ))
        ).subFlowMapping(false, IntegrationFlowDefinition::bridge))
    .handle(amqpOutboundEndpoint)

Solution

  • Use groupTimeoutExpression() instead of a fixed timeout...

    payload.title == 'A' ? 10000 : 30000