I am looking for notifications of a single document's 'update' operation within a collection, supposedly when that particular document gets updated MongoDB would notify a 'watch' registered .net client, but the reality is MongoDB returns all documents with 'update' operations in the collection, no matter how the "match" condition of filter sets.
Can anyone with Change Stream experience help? Is it the nature/design of MongoDB change stream?
Below is the test code pieces of .net C# client,
The class:
public class UserInfo
{
[BsonId, BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; }
[BsonElement("UserName", Order = 1), BsonRepresentation(BsonType.String)]
public string UserName { get; set; }
[BsonElement("Password", Order = 2), BsonRepresentation(BsonType.String)]
public string Password { get; set; }
[BsonElement("LastName", Order = 3), BsonRepresentation(BsonType.String)]
public string LastName { get; set; }
[BsonElement("FirstName", Order = 4), BsonRepresentation(BsonType.String)]
public string FirstName { get; set; }
[BsonElement("Email", Order = 5), BsonRepresentation(BsonType.String)]
public string Email { get; set; }
}
Pipeline of filter conditions:
collection = myDB.GetCollection<UserInfo>("Users");
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<UserInfo>>();
pipeline.Match(g => g.OperationType == ChangeStreamOperationType.Update)
.Match(o => o.FullDocument.UserName.Contains("Alice"))
.Match(t => t.UpdateDescription.UpdatedFields.ToString().Contains(nameof(UserInfo.Password)));
Task watchTask = WatchCollection();
Change event process routine:
private static async Task WatchCollection()
{
var cursor = await collection.WatchAsync(pipeline, options);
Debug.WriteLine("ChangeStream started.");
await cursor.ForEachAsync(change =>
{
Debug.WriteLine("Matched UserName: " + change.FullDocument.UserName);
});
}
From the above routine, change events aren't working according to the filter conditions, every time this routine gets triggered, 'change.FullDocument.UserName' actually prints out every document in the collection that gets 'update' operation, which is quite weird.
i was able to get it to work with the following:
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
BatchSize = 1
};
var filter = Builders<ChangeStreamDocument<UserInfo>>
.Filter.Where(x =>
x.OperationType == ChangeStreamOperationType.Update &&
x.FullDocument.UserName.Contains("Alice"));
filter &= Builders<ChangeStreamDocument<UserInfo>>.Filter.Exists("updateDescription.updatedFields.Password");
var pipeline = new IPipelineStageDefinition[]
{
PipelineStageDefinitionBuilder.Match(filter)
};
using (var cursor = await collection.WatchAsync<ChangeStreamDocument<UserInfo>>(pipeline, options))
{
while (await cursor.MoveNextAsync())
{
foreach (var info in cursor.Current)
{
Console.WriteLine("Updated: " + info.FullDocument.UserName);
}
}
}
if you don't mind using a library, all of the above song and dance can be avoided and things can be distilled down to the following:
var watcher = DB.Watcher<UserInfo>("on-alice-updates-password");
watcher.Start(
eventTypes: EventType.Updated,
filter: b => b.Where(x => x.FullDocument.UserName == "Alice") &
b.Exists("updateDescription.updatedFields.Password"));
watcher.OnChanges += docs =>
{
foreach (var doc in docs)
Console.WriteLine("Updated: " + doc.UserName);
};
check out MongoDB.Entities docs for more info. disclaimer: i'm the author of that library.