I have one message SomeMessage
that looks like this:
class SomeMessage{
id,
title
}
Currently, I aggregate messages based on id. Messages are released after 10 seconds.
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
What I need is a way to throttle messages based on title
property. If title=="A"
, it should still wait 10 seconds for aggregation; If title=="B"
it should wait 60 seconds for aggregation and it should not be immediately sent to amqpOutboundEndpoint
but it should have some throttling (eg. 30 seconds between every message that has title=="B"
).
What would be the best way to do this? Is there something like throttling on AmqpOutboundEndpoint
?
UPDATE
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage, Boolean>) ec ->
ec.getTitle().equals("B"),
m -> m.subFlowMapping(true, sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60, TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false, IntegrationFlowDefinition::bridge))
.handle(amqpOutboundEndpoint)
Use groupTimeoutExpression()
instead of a fixed timeout...
payload.title == 'A' ? 10000 : 30000