Search code examples
mongodbmongodb-java

Atomically query for all collection documents + watching for further changes


Our Java app saves its configurations in a MongoDB collections. When the app starts it reads all the configurations from MongoDB and caches them in Maps. We would like to use the change stream API to be able also to watch for updates of the configurations collections. So, upon app startup, first we would like to get all configurations, and from now on - watch for any further change.
Is there an easy way to execute the following atomically:

  1. A find() that retrieves all configurations (documents)
  2. Start a watch() that will send all further updates

By atomically I mean - without potentially missing any update (between 1 and 2 someone could update the collection with new configuration).


Solution

  • To make sure I lose no update notifications, I found that I can use watch().startAtOperationTime(serverTime) (for MongoDB of 4.0 or later), as follows.

    1. Query the MongoDB server for its current time, using command such as Document hostInfoDoc = mongoTemplate.executeCommand(new Document("hostInfo", 1))
    2. Query for all interesting documents: List<C> configList = mongoTemplate.findAll(clazz);
    3. Extract the server time from hostInfoDoc: BsonTimestamp serverTime = (BsonTimestamp) hostInfoDoc.get("operationTime");
    4. Start the change stream configured with the saved server time ChangeStreamIterable<Document> changes = eventCollection.watch().startAtOperationTime(serverTime);

    Since 1 ends before 2 starts, we know that the documents that were returned by 2 were at least same or fresher than the ones on that server time. And any updates that happened on or after this server time will be sent to us by the change stream (I don't care to run again redundant updates, because I use map as cache, so extra add/remove won't make a difference, as long as the last action arrives).

    I think I could also use watch().resumeAfter(_idOfLastAddedDoc) (didn't try). I did not use this approach because of the following scenario: the collection is empty, and the first document is added after getting all (none) documents, and before starting the watch(). In that scenario I don't have previous document _id to use as resume token.

    Update

    Instead of using "hostInfo" for getting the server time, which couldn't be used in our production, I ended using "dbStats" like that:

    Document dbStats= mongoOperations.executeCommand(new Document("dbStats", 1)); BsonTimestamp serverTime = (BsonTimestamp) dbStats.get("operationTime");