Search code examples
node.jsmongodbmongoosechangestream

Unable to create watch on multiple collections mongodb


I am trying to create a watch on a collection of DB in MongoDB using mongoose construct,

collection.watch({ fullDocument: "updateLookup" })

My entire function is as below
exports.createWatchOnAuthCollection = (site, type) => {
    if (watchedTypes.includes(type + '_' + site)) {
        console.log('Ignoring.. Already watch added on this type and site ' + type + '_' + site);
        return;
    }
    dbConnectionPool.getDBConnection('auth_' + site, function (dbConnection) {
        if (type == 'unit_status') {
            console.log('Trying to add watch on ' + site);
            var collection = dbConnection.collection('units');
            collection.watch({
                fullDocument: "updateLookup"
            })
                .on('change', function (change) {
                    handleStatusUpdate(site, change);
                })
                .on('error', function (error) {
                    console.log('Got a error reply')
                });
            watchedTypes.push(type + '_' + site);
        } else if (type == 'iatas') {
            
        }
    });
}

The problem I am facing is when I loop this function call for creating a watch on multiple collections, only the watch on the latest collection that is created actually works, and the callback is invoked, but not on the other collections. My invoking function is as below

sites.forEach(site => {
    controller.createWatchOnAuthCollection(site, watchType);
})

Thanks in advance..:)


Solution

  • You cannot use the same session to create multiple change stream listeners. So, you would need to specify different sessions or use a different connection to open each stream.

    Also note that having many streams opened concurrently could negatively impact performance, so it is recommended to only open one stream on the db or the connection objects and filter out the collections you want to monitor. For example:

    ...
    collections = [];
    sites.forEach(site => {
      // For each collection to watch, add a filter in the collections array
      collections.push({ "db": "auth_" + site, "coll": "units" });
    });
    
    // Create a change stream on the deployment and filter only
    // the collections we want
    client.watch([ { "$match": { "ns": { "$in": collections } } } ],
        { "fullDocument": "updateLookup" });
    ...