Search code examples
javamongodbmongodb-queryreactive-programmingreactive-streams

Subscriber instances in mongodb reactive streams


I have come across mongodb reactive streams drivers and they seem to be pretty good for asynchronous operations. Also, for every operation we do, we need to specify a subscriber for that. My query is whether we should create a different subscriber instance for every operation we do. For example, consider this snippet from mongodb docs

// 1. Ordered bulk operation - order is guaranteed
subscriber = new PrintSubscriber<BulkWriteResult>("Bulk write results: %s");
collection.bulkWrite(
  Arrays.asList(new InsertOneModel<>(new Document("_id", 4)),
                new InsertOneModel<>(new Document("_id", 5)),
                new InsertOneModel<>(new Document("_id", 6)),
                new UpdateOneModel<>(new Document("_id", 1),
                                     new Document("$set", new Document("x", 2))),
                new DeleteOneModel<>(new Document("_id", 2)),
                new ReplaceOneModel<>(new Document("_id", 3),
                                      new Document("_id", 3).append("x", 4)))
  ).subscribe(subscriber);
subscriber.await();

In this, its only doing some bulk write operations. If I am doing these operations for batches in a loop like this

while(someresultset.hasNext()) {
 list.add(someresultset.getNext())

 if(list.size() >= 10000)
   doWrites() // can I use same subscriber instance declared outside of this loop or I should create the subscriber instance every time?

 list = new list()
}

Solution

  • My query is whether we should create a different subscriber instance for every operation we do

    Yes, you must create a different subscriber instance each time you subscribe. You're subscribing to a reactive streams publisher, which states:

    A Subscriber should only subscribe once to a single Publisher.