Our Java app saves its configurations in a MongoDB collections. When the app starts it reads all the configurations from MongoDB and caches them in Maps. We would like to use the change stream API to be able also to watch for updates of the configurations collections.
So, upon app startup, first we would like to get all configurations, and from now on - watch for any further change.
Is there an easy way to execute the following atomically:
find()
that retrieves all configurations (documents)watch()
that will send all further updatesBy atomically I mean - without potentially missing any update (between 1 and 2 someone could update the collection with new configuration).
To make sure I lose no update notifications, I found that I can use watch().startAtOperationTime(serverTime)
(for MongoDB of 4.0 or later), as follows.
Document hostInfoDoc = mongoTemplate.executeCommand(new Document("hostInfo", 1))
List<C> configList = mongoTemplate.findAll(clazz);
BsonTimestamp serverTime = (BsonTimestamp) hostInfoDoc.get("operationTime");
ChangeStreamIterable<Document> changes = eventCollection.watch().startAtOperationTime(serverTime);
Since 1 ends before 2 starts, we know that the documents that were returned by 2 were at least same or fresher than the ones on that server time. And any updates that happened on or after this server time will be sent to us by the change stream (I don't care to run again redundant updates, because I use map as cache, so extra add/remove won't make a difference, as long as the last action arrives).
I think I could also use watch().resumeAfter(_idOfLastAddedDoc)
(didn't try). I did not use this approach because of the following scenario: the collection is empty, and the first document is added after getting all (none) documents, and before starting the watch()
. In that scenario I don't have previous document _id
to use as resume token.
Update
Instead of using "hostInfo" for getting the server time, which couldn't be used in our production, I ended using "dbStats" like that:
Document dbStats= mongoOperations.executeCommand(new Document("dbStats", 1));
BsonTimestamp serverTime = (BsonTimestamp) dbStats.get("operationTime");