Search code examples
design-patternsintegrationspring-integrationmessagingspring-amqp

Throttling inbound AMQP messages


My system looks like this:

[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue]

The queues are RabbitMQ 3.5.6. I'm using Spring Integration 4.2.1, Spring AMQP 1.5.1 and Spring Integration Java DSL 1.1.0.

I would like to throttle the consumption of messages from the queue Q1 by the Service1 depending on how many messages are currently in processing and haven't reached the Outbound queue - eg. I want to have max. 10 messages being processed at a time. That is because the processing part consumes a lot of resources and I don't want to overload the system.

My current configuration of the initial part of the flow is simply as follows:

IntegrationFlows
    .from(Amqp.inboundAdapter(connectionFactory, "Q1"))
    .handle(message -> service1.process(message.getPayload())
    .get();

Service1 and ServiceN can communicate (it's the same JVM), so I am able to implement a locking mechanism between them so that service1.process() blocks before proceeding with the execution if the "in processing" message limit is reached. That is what - if I understand correctly - @Gary Russell suggested in this comment. It will however result in the messages being picked up from the broker and hanging there for a while in an unacked state. Is there a way to just not pick up the messages from the queue at all?

The answer of @Artem Bilan to use SimpleMessageListenerContainer.stop()/.start() seems quite heavyweight looking at the implementation and all the shutdown/startup logic that would be invoked.

Also both answers are two years old now. Any better suggestions?


Solution

  • Not using the message-driven adapter, no. The broker will push messages to the consumer (according to the prefetch-count).

    I am not sure why you object to the message sitting in an un-ack'd state.

    An alternative is to use a simple polled <int:inbound-channel-adapter/>, with say 10 threads in the poller, and in the POJO the adapter invokes, use a RabbitTemplate to receive() messages, but this is less efficient than a message-driven adapter.