I have an integration flow defined like this:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
Whose purpose is to group several related messages that are sent in a "batch". Here's an example:
// Message 1
{ "name": "Message1",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 1,
.... }
// Message 2
{ "name": "Message2",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 2,
.... }
Due to reasons described here we're using manual ack:ing to RabbitMQ to avoid losing messages.
The integration flow works great for batches of size 2 but as soon as there's more than 2 messages in a batch we run into trouble:
[my-service] 2017-12-04 17:46:07.966 INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2
[my-service] 2017-12-04 17:46:09.976 INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3
Note that the time between the logged messages is roughly 2 seconds (i.e. what we've confirgured as groupTimeout
).
What I suspect the reason for this is is that Spring consumes 2 messages (which are not ack:ed automatically) then the aggregation waits for a 3rd message (since batchSize
is 3 in this case). But this message will never be consumed within the 2 second window since there are only two concurrent consumers.
Increasing the concurrentConsumers
count to 3 solves this particular issue. The problem is that we don't know the size of the batches that we receive and they could be quite large, perhaps of size 50 or so. This means that simply increasing concurrentConsumers
is not a viable option.
What's the appropriate way to handle this in Spring?
As I discussed in the comments to this answer...
When using this pattern, the concurrency * prefetch
must be big enough to contain the messages for all outstanding batches.
For this reason, I am not in favor of using the pattern, unless you have fairly predictable data.