My system looks like this:
[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue]
The queues are RabbitMQ 3.5.6. I'm using Spring Integration 4.2.1, Spring AMQP 1.5.1 and Spring Integration Java DSL 1.1.0.
I would like to throttle the consumption of messages from the queue Q1
by the Service1
depending on how many messages are currently in processing and haven't reached the Outbound queue
- eg. I want to have max. 10 messages being processed at a time. That is because the processing part consumes a lot of resources and I don't want to overload the system.
My current configuration of the initial part of the flow is simply as follows:
IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, "Q1"))
.handle(message -> service1.process(message.getPayload())
.get();
Service1
and ServiceN
can communicate (it's the same JVM), so I am able to implement a locking mechanism between them so that service1.process()
blocks before proceeding with the execution if the "in processing" message limit is reached. That is what - if I understand correctly - @Gary Russell suggested in this comment. It will however result in the messages being picked up from the broker and hanging there for a while in an unacked state. Is there a way to just not pick up the messages from the queue at all?
The answer of @Artem Bilan to use SimpleMessageListenerContainer.stop()/.start()
seems quite heavyweight looking at the implementation and all the shutdown/startup logic that would be invoked.
Also both answers are two years old now. Any better suggestions?
Not using the message-driven adapter, no. The broker will push messages to the consumer (according to the prefetch-count
).
I am not sure why you object to the message sitting in an un-ack'd state.
An alternative is to use a simple polled <int:inbound-channel-adapter/>
, with say 10 threads in the poller, and in the POJO the adapter invokes, use a RabbitTemplate
to receive()
messages, but this is less efficient than a message-driven adapter.