Search code examples
javaspringspring-integrationspring-rabbitspring-integration-amqp

How to consume all messages required in a Spring IntegrationFlow when message count is greater than the number of concurrent consumers?


I have an integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

Whose purpose is to group several related messages that are sent in a "batch". Here's an example:

// Message 1
{ "name": "Message1", 
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 1, 
  .... }

// Message 2
{ "name": "Message2",
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 2, 
  .... }

Due to reasons described here we're using manual ack:ing to RabbitMQ to avoid losing messages.

The integration flow works great for batches of size 2 but as soon as there's more than 2 messages in a batch we run into trouble:

[my-service] 2017-12-04 17:46:07.966  INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2 
[my-service] 2017-12-04 17:46:09.976  INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3

Note that the time between the logged messages is roughly 2 seconds (i.e. what we've confirgured as groupTimeout).

What I suspect the reason for this is is that Spring consumes 2 messages (which are not ack:ed automatically) then the aggregation waits for a 3rd message (since batchSize is 3 in this case). But this message will never be consumed within the 2 second window since there are only two concurrent consumers.

Increasing the concurrentConsumers count to 3 solves this particular issue. The problem is that we don't know the size of the batches that we receive and they could be quite large, perhaps of size 50 or so. This means that simply increasing concurrentConsumers is not a viable option.

What's the appropriate way to handle this in Spring?


Solution

  • As I discussed in the comments to this answer...

    When using this pattern, the concurrency * prefetch must be big enough to contain the messages for all outstanding batches.

    For this reason, I am not in favor of using the pattern, unless you have fairly predictable data.