Search code examples
c#mongodbmongodb-.net-driver

MongoDb C# Driver filter WatchAsync on FullDocument fields


I am trying to filter collection changes, selecting only updated, deleted and inserted documents satisfying a particular filter. Old style filter (working on oplog) was

  // filter generated by settings
  FilterDefinition<BsonDocument> filter = GetFilterFromOverriddenMethod();
  // interested only in u, d and filtered i documents
  filter = Builders<BsonDocument>.Filter.Or(
    Builders<BsonDocument>.Filter.In("op", new[] { "u", "d" }),
    Builders<BsonDocument>.Filter.And(
      Builders<BsonDocument>.Filter.Eq("op", "i"),
      filter
    )
  );
  // filter applied on oplog collection
  filter = Builders<BsonDocument>.Filter.And(
    Builders<BsonDocument>.Filter.Eq("ns", "dbName.collectionName",
    Builders<BsonDocument>.Filter.Gt("ts", lastId),
    filter
  );

I am trying to reproduce (more or less) same behavior using collection.WatchAsync, and cannot modify GetFilterFromOverriddenMethod. I tried using a pipeline

  FilterDefinition<BsonDocument> filter = GetFilterFromOverriddenMethod();
  var deleteUpdateFilter = Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(change => 
        change.OperationType, 
        new[] { ChangeStreamOperationType.Update, ChangeStreamOperationType.Delete }
        );
  var insertReplaceFilter = Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(change => 
        change.OperationType, 
        new[] { ChangeStreamOperationType.Insert, ChangeStreamOperationType.Replace }
        );
  var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match( deleteUpdateFilter |
      (insertReplaceFilter &
        Builders<ChangeStreamDocument<BsonDocument>>.Filter.**??**(change => change.FullDocument, filter)
      )
    );

but I am not able to integrate filter (being FilterDefinition<BsonDocument> not appliable to FilterDefinition<ChangeStreamDocument<BsonDocument>>). I tried also to print the pipeline to hack its details and generate the filter from serialized pipeline, but I am not able to print FilterDefinition<ChangeStreamDocument<BsonDocument>>,

  var documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<ChangeStreamDocument<BsonDocument>>();

  var renderedFilter = deleteUpdateFilter.Render(documentSerializer, BsonSerializer.SerializerRegistry);
                Console.WriteLine(renderedFilter.ToString());

throws an exception calling GetSerializer

System.MissingMethodException: No parameterless constructor defined for type 'MongoDB.Driver.ChangeStreamDocumentSerializer`1[MongoDB.Bson.BsonDocument]'.

Does anyone have a suggestion?


Solution

  • Found some informations, if anyone need I can share:

    1. it is not possible to preserve filter produced for oplog with streams because oplog filter must explicitly refer to "o" field of oplog item, not present in ChangeStream interface
    2. filter can work using a different prefix, referring to "fullDocument" (filter becomes something like Builders<BsonDocument>.Filter.Eq("fullDocument.foo", true))
    3. It is possible to compose FilterDefinition<T> in a FilterDefinition<U> rendering it and creating a BsonDocumentFilterDefinition, for example
        var filter = Builders<BsonDocument>.Filter.Eq("fullDocument.foo", true);
        var documentSerializer = BsonSerializer.SerializerRegistry.GetSerializer<BsonDocument>();
        var renderedFilter = filter.Render(documentSerializer, BsonSerializer.SerializerRegistry);
        var subFilter = new BsonDocumentFilterDefinition<ChangeStreamDocument<BsonDocument>>(renderedFilter);
    
        var pipelineFilter = 
          Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(
            change => change.OperationType, new[] { ChangeStreamOperationType.Update, ChangeStreamOperationType.Delete }
          ) |
          (
            Builders<ChangeStreamDocument<BsonDocument>>.Filter.In(
              change => change.OperationType, new[] { ChangeStreamOperationType.Insert, ChangeStreamOperationType.Replace }
            ) &
            subFilter
          );
    
    

    Furthermore, rendering FilterDefinition<ChangeStreamDocument<BsonDocument>> is really simple (thanks to https://www.mongodb.com/community/forums/t/how-can-i-safely-serialize-change-stream-documents-with-c/2335), by instancing a proper serializer (so obvious!)

    var changeStreamSerializer = new ChangeStreamDocumentSerializer<BsonDocument>(BsonDocumentSerializer.Instance);
    

    and then calling Render usual way.