Search code examples
c#mongodbchangestream

C# - Change Streams in MongoDB with $match


I'm trying to narrow down the Change streams in MongoDB to a specific document matching on the document's _id as I have many documents in one collection. Anyone know how to do this in C#? Here's the latest that I've tried to no avail:

{
    var userID = "someIdHere";
    var match = new BsonDocument
    {
        {
            "$match",
            new BsonDocument
            {
                {"_id", userID}
            }
        }
    };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Class>>().Match(match);

    var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
    var cursor = collection.Watch(pipeline, options).ToEnumerable();
    foreach (var change in cursor)
    {
        Debug.WriteLine(change.FullDocument.ToJson());
        Debug.WriteLine(change.ResumeToken + " " + change.OperationType);
    }
} 

If I change the cursor to what you see below, it works but it returns the world and returns the change stream when there's activity on any of the _id's present in the document. That's not what I'm going for.

var cursor = collection.Watch().ToEnumerable();

Solution

  • After searching near and far, I was able to piece together bits and pieces of information from other issues I found online and came up with the solution below. It works like a charm!

    Not only was I able to filter Change Stream so that it only recognizes updates but I was able to narrow down the stream to a SPECIFIC document _id AND made it even more granular finding a specific change to a field called LastLogin for that _id. This is what I desired as the default Change stream returned any update that happened on the collection.

    I hope this helps someone that come across the same issue I did. Cheers.

    {
        var db = client.GetDatabase(dbName);
        var collectionDoc = db.GetCollection<BsonDocument>(collectionName);
        var id = "someID";
    
        //Get the whole document instead of just the changed portion
        var options = new ChangeStreamOptions
        {
            FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
        };
    
        //The operationType of update, where the document id in collection is current one and the updated field
        //is last login.
        var filter = "{ $and: [ { operationType: 'update' }, " +
                     "{ 'fullDocument._id' : '" + id + "'}" +
                     "{ 'updateDescription.updatedFields.LastLogin': { $exists: true } } ] }";
    
        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);
    
        var changeStream = collectionDoc.Watch(pipeline, options).ToEnumerable().GetEnumerator();
    
        try
        {
            while (changeStream.MoveNext())
            {
                var next = changeStream.Current;
                Debug.WriteLine("PRINT-OUT:" + next.ToJson());
            }
        }
        catch (Exception ex)
        {
            Debug.WriteLine("PRINT-OUT: " + ex);
        }
        finally
        {
            changeStream.Dispose();
        }
    }