I am trying to filter collection changes, selecting only updated, deleted and inserted documents satisfying a particular filter. Old style filter (working on oplog) was
// filter generated by settings
FilterDefinition<BsonDocument> filter = GetFilterFromOverriddenMethod();
// interested only in u, d and filtered i documents
filter = Builders<BsonDocument>.Filter.Or(
Builders<BsonDocument>.Filter.In("op", new[] { "u", "d" }),
Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Eq("op", "i"),
filter
)
);
// filter applied on oplog collection
filter = Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Eq("ns", "dbName.collectionName",
Builders<BsonDocument>.Filter.Gt("ts", lastId),
filter
);
I am trying to reproduce (more or less) same behavior using collection.WatchAsync, and cannot modify GetFilterFromOverriddenMethod. I tried using a pipeline
FilterDefinition<BsonDocument> filter = GetFilterFromOverriddenMethod();
var deleteUpdateFilter = Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(change =>
change.OperationType,
new[] { ChangeStreamOperationType.Update, ChangeStreamOperationType.Delete }
);
var insertReplaceFilter = Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(change =>
change.OperationType,
new[] { ChangeStreamOperationType.Insert, ChangeStreamOperationType.Replace }
);
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match( deleteUpdateFilter |
(insertReplaceFilter &
Builders<ChangeStreamDocument<BsonDocument>>.Filter.**??**(change => change.FullDocument, filter)
)
);
but I am not able to integrate filter (being FilterDefinition<BsonDocument>
not appliable to FilterDefinition<ChangeStreamDocument<BsonDocument>>
).
I tried also to print the pipeline to hack its details and generate the filter from serialized pipeline, but I am not able to print FilterDefinition<ChangeStreamDocument<BsonDocument>>
,
var documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<ChangeStreamDocument<BsonDocument>>();
var renderedFilter = deleteUpdateFilter.Render(documentSerializer, BsonSerializer.SerializerRegistry);
Console.WriteLine(renderedFilter.ToString());
throws an exception calling GetSerializer
System.MissingMethodException: No parameterless constructor defined for type 'MongoDB.Driver.ChangeStreamDocumentSerializer`1[MongoDB.Bson.BsonDocument]'.
Does anyone have a suggestion?
Found some informations, if anyone need I can share:
Builders<BsonDocument>.Filter.Eq("fullDocument.foo", true)
)FilterDefinition<T>
in a FilterDefinition<U>
rendering it and creating a BsonDocumentFilterDefinition, for example var filter = Builders<BsonDocument>.Filter.Eq("fullDocument.foo", true);
var documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<BsonDocument>();
var renderedFilter = filter.Render(documentSerializer, BsonSerializer.SerializerRegistry);
var subFilter = new BsonDocumentFilterDefinition<ChangeStreamDocument<BsonDocument>>(renderedFilter);
var pipelineFilter =
Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(
change => change.OperationType, new[] { ChangeStreamOperationType.Update, ChangeStreamOperationType.Delete }
) |
(
Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(
change => change.OperationType, new[] { ChangeStreamOperationType.Insert, ChangeStreamOperationType.Replace }
) &
subFilter
);
Furthermore, rendering FilterDefinition<ChangeStreamDocument<BsonDocument>>
is really simple (thanks to https://www.mongodb.com/community/forums/t/how-can-i-safely-serialize-change-stream-documents-with-c/2335), by instancing a proper serializer (so obvious!)
var changeStreamSerializer = new ChangeStreamDocumentSerializer<BsonDocument>(BsonDocumentSerializer.Instance);
and then calling Render usual way.