I am using ReactiveMongoTemplate MongoDB ChangeStream to listen to updates to a MongoDB collection, perform some queries, and persist a document to another collection. While it had been working fine locally, It started giving the below error after deployment to UAT which had lots of volume:
Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.
What are the ways to resolve this?
I have the following in application.yml file
spring:
data:
mongodb:
uri: mongodb://host:port/db?authMechanism=<val1>&authSource=<val2>&authechanismProperties=<val3>
And this is how the simplified change streamer looks like:
@Autowired
ReactiveMongoTemplate reactiveMongoTemplate;
reactiveMongoTemplate
.changeStream(Sample.class)
.watchCollection("sample_collection")
.filter(
new Criteria.orOperator(
where("operationType").is("update"),
where("operationType").is("insert")
)
)
.listen()
.flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
I understand I might need to add some connection pooling to be able to handle multiple connections? But how do I configure that with Reactive MongoDB? I am new to reactive programming. Any pointers would be really helpful.
There are couple of things you can do here:
Check if there are some long blocking calls being made, causing the threads to be blocked, and leading to lots of connections being created, as the previous ones are still occupied performing the heavy task. Try checking for some optimisations to the code which is blocking those calls.
In Reactive Programming, you can check the presence of blocking code using BlockHound
.
Increase the connections limit by specifying the waitQueueMultiple
or maxPoolSize
-
https://docs.mongodb.com/manual/reference/connection-string/#connection-pool-options
Before that you can check your mongo db stats to see the current and allowed connections using
db.serverStatus().connections