I am setting up a ChangeStream to notify me when a document has changed in a collection so that I can upsert the "LastModified" element for that document to the time of the event. Since this update will cause a new event to occur on the ChangeStream, I need to filter out these updates to prevent an infinite loop (updating the LastModified element because the LastModified element was just updated...).
I have the following code that is working when I specify the exact field:
ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);
var cursor = collection.Watch(pipeline, options, cancelToken);
However, instead of hard-coding the "updateDescription.updatedFields.LastModified", I would like to provide a list of element names that I don't want to exist in the updatedFields document.
I attempted:
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";
but this didn't work as expected (I still got the update events for the LastModified change.
I originally was using the Filter Builder:
FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" }); //Only include the change if it was one of these types. Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list
where ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.
Can anyone help with the syntax that I need to use? or do I need to create a loop around my ChangedFieldsToIgnore list and create a new entry in the filter for each item to "$exists: false"? (this doesn't seem very efficient).
EDIT:
I attempted the following code based on the answer by @wan-bachtiar, but I'm getting an exception on my enumerator.MoveNext() call:
var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };
var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;
The exception is: "{"Invalid field name: \"tmpfields\"."}"
I suspect the problem might be that I'm getting "replace" and "insert" events which do not contain the updateDescription field, so the $addFields/$objectToArray are failing. I'm too new to figure out the syntax, but I think I need to use a filter that does:
{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}
Also, it appears that the C# driver does not include a Builder that helps with the $addFields and $objectToArray operations. I was only able to use the new BsonDocument {...}
method to build the pipeline variable.
ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.
If you would like to filter based on multiple keys (whether updatedFields
contains certain fields), it's easier if you convert the keys to values first.
You can convert the document contained within updatedFields
into values by utilising aggregation operator $objectToArray. For example:
pipeline = [{"$addFields": {
"tmpfields":{
"$objectToArray":"$updateDescription.updatedFields"}
}},
{"$match":{"tmpfields.k":{
"$nin":["LastModified", "AnotherUnwantedField"]}}}
];
The above aggregation pipeline adds a temporary field called tmpfields
. This new field will pivot the content of updateDescription.updatedFields
turning {name:value}
into [{k:name, v:value}]
. Once we have those keys as values, we can utilise $nin
as an array of filter.
UPDATED
The reason you're getting an exception of tmpfields
being invalid, is because the result is casted into ChangeStreamDocument model which does not have a recognizable field called tmpfields
.
In the case, when it's different operations that does not have field updateDescription.updatedFields
, the value of tmpfields
would just be null
.
Below is an example of MongoDB ChangeStream .Net/C# using MongoDB .Net driver v2.5, along with an aggregation pipeline that modifies the output change stream.
This example is not type safe, and would return BsonDocument
:
var database = client.GetDatabase("database");
var collection = database.GetCollection<BsonDocument>("collection");
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
// Aggregation Pipeline
var addFields = new BsonDocument {
{ "$addFields", new BsonDocument {
{ "tmpfields", new BsonDocument {
{ "$objectToArray",
"$updateDescription.updatedFields" }
} }
} } };
var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "tmpfields.k", new BsonDocument {
{ "$nin", new BsonArray{"LastModified", "Unwanted"} }
} } } } };
var pipeline = new[] { addFields, match };
// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.ToJson());
}