Search code examples
c#mongodbmongodb-querymongodb-.net-driverazure-cosmosdb-mongoapi

How to do an upsert into a collection, but no-op if the persisted document has the same or later version?


I'd like to upsert (insert if matching Id doesn't exist, update if matching Id exists) a document into an existing collection, but with the caveat that there's a Version integer property and if what's already persisted in the collection has a higher version than what we're trying to upsert, then the upsert does nothing.

Given a test document definition like this:

public record TestDocument(string Id, int Version, string SomeDataString);

We should expect that:

  • upserting a document with Id="Foo" and Version=[anything] works if there's no Id="Foo" in the collection yet, inserting the new document
  • upserting a document with Id="Foo" and Version=2 works if there's an Id="Foo" document with Version of 1 (anything less than 2), replacing the existing document
  • upserting a document with Id="foo" and Version=2 does nothing if there's an Id="Foo" document with Version of 2 or higher

Since Id is enforced as unique, I can get this functionality in an ugly way by doing an update with a filter matching on Id and Version less than what the new document version is, then catch the exception when it tries inserting the conflicting document in the case where the persisted document is the same or higher version.

var idMatchFilter = Builders<TestDocument>.Filter.Eq(e => e.Id, upsertDocument.Id);
var versionFilter = Builders<TestDocument>.Filter.Lt(e => e.Version, upsertDocument.Version);
var combinedFilter = Builders<TestDocument>.Filter.And(idMatchFilter, versionFilter);

var replaceOptions = new ReplaceOptions() { IsUpsert = true };
try
{
    await testCollection.ReplaceOneAsync(combinedFilter, upsertDocument, replaceOptions);
}
catch (MongoWriteException mongoWriteException) when (mongoWriteException.Message.Contains("A write operation resulted in an error. WriteError: { Category : \"DuplicateKey\", Code : 11000, Message : \"E11000 duplicate key error collection"))
{
    // expected possible error when we attempt to insert an older document - ideally this would be a no-op instead of attempted write
}

However, since the actual use case is a bulk upsert (say 1000 documents) with a much more complicated document (not a matter where we could just set one or two properties, we need to replace the document), then this doesn't seem like a great approach, if it's even feasible. It's also against Azure Cosmos DB, although I'm hoping that won't be meaningful and anything that works against normal mongodb should work here as well.

Here's an xunit test that includes some of the things I've tried and how they've failed so far. Note that as written, it passes, but only because we're catching that MongoWriteException, and the goal is to make an operation that just no-ops instead of attempts the write of an older-version document.

[Theory]
// insert, nothing persisted yet
[InlineData(null, 1, true)]
// update, persisted version is older
[InlineData(1, 2, true)]
// update, persisted version is same
[InlineData(2, 2, false)]
// update, persisted version is newer
[InlineData(2, 1, false)]
public async Task ConditionalUpsert(int? persistedVersion, int upsertVersion, bool shouldUpdate)
{
    // arrange
    var testCollection = GetTestCollection();
    var emptyFilter = Builders<TestDocument>.Filter.Empty;
    await testCollection.DeleteManyAsync(emptyFilter);
    if (persistedVersion.HasValue)
    {
        // seed the collection with expected version
        var testDocument = new TestDocument("Foo", persistedVersion.Value, "persisted document dummy payload");
        await testCollection.InsertOneAsync(testDocument);
    }
    var upsertDocument = new TestDocument("Foo", upsertVersion, "new document dummy payload");

    // act
    var idMatchFilter = Builders<TestDocument>.Filter.Eq(e => e.Id, upsertDocument.Id);
    var versionFilter = Builders<TestDocument>.Filter.Lt(e => e.Version, upsertDocument.Version);
    var combinedFilter = Builders<TestDocument>.Filter.And(idMatchFilter, versionFilter);

    // incorrectly updates the persisted document when trying to write older version
    //var findOneAndReplaceOptions = new FindOneAndReplaceOptions<TestDocument> { IsUpsert = true };
    //await testCollection.FindOneAndReplaceAsync(idMatchFilter, upsertDocument, findOneAndReplaceOptions);

    // incorrectly tries to insert new document insert of no-op when upserting an old version
    // Command findAndModify failed: E11000 duplicate key error collection: SomeDatabase.SomeCollection. Failed _id or unique index constraint
    //var findOneAndReplaceOptions = new FindOneAndReplaceOptions<TestDocument> { IsUpsert = true };
    //await testCollection.FindOneAndReplaceAsync(combinedFilter, upsertDocument, findOneAndReplaceOptions);

    var replaceOptions = new ReplaceOptions() { IsUpsert = true };
    try
    {
        // incorrectly tries to insert new document with older version since the specified filter doesn't find the persisted version if it was newer
        await testCollection.ReplaceOneAsync(combinedFilter, upsertDocument, replaceOptions);
    }
    catch (MongoWriteException mongoWriteException) when (mongoWriteException.Message.Contains("A write operation resulted in an error. WriteError: { Category : \"DuplicateKey\", Code : 11000, Message : \"E11000 duplicate key error collection"))
    {
        // expected possible error when we attempt to insert an older document - ideally this would be a no-op instead of attempted write
    }

    // assert
    var allDocuments = await testCollection.Find(emptyFilter).ToListAsync();
    var queriedDocument = Assert.Single(allDocuments);
    var expectedVersion = shouldUpdate ? upsertVersion : persistedVersion;
    Assert.Equal(expectedVersion, queriedDocument.Version);
}

Solution

  • The good news is that you can solve this with a combination of an Upsert and an Update with aggregation pipeline. As an outline, you'd filter for the id and perform the comparison of the version in the aggregation pipeline.

    In plain MQL, this statement would achieve this and also work for complex documents:

    db.collection.update({
      _id: 1
    },
    [
      {
        $set: {
          versions: {
            $sortArray: {
              input: [
                "$$ROOT",
                {
                  _id: 1,
                  "version": 4,
                  "prop": "new-value"
                }
              ],
              sortBy: {
                "version": -1
              }
            }
          }
        }
      },
      {
        $replaceWith: {
          $first: "$versions"
        }
      }
    ],
    {
      upsert: true
    })
    

    You can use this playground to test it.

    The bad news is, that this statement cannot be depicted with the methods of the driver. While there is support for the $set stage as such and also the $replaceWith stage, the reference to the $$ROOT is not supported (at least to my knowledge).

    The following code can serve as a starting point - test and adjust it to your needs. First, create a helper record/class that contains a list of the versions:

    private record TestDocumentWithVersions(string Id, int Version, string SomeDataString, List<TestDocument> Versions)
    : TestDocument(Id, Version, SomeDataString);
    

    You can perform the update like this:

    var newVersion = doc.ToJson();
    var filter = Builders<TestDocument>.Filter.Eq(d => d.Id, doc.Id);
    var stage = BsonDocument.Parse("""
        {
          $set: {
            Versions: {
              $sortArray: {
                input: [
                  "$$ROOT",
                  %%NEW_VERSION%%
                ],
                sortBy: {
                  "Version": -1
                }
              }
            }
          }
        }
        """.Replace("%%NEW_VERSION%%", newVersion));
    var pipeline = new EmptyPipelineDefinition<TestDocument>()
        .AppendStage<TestDocument, TestDocument, TestDocumentWithVersions>(stage)
        .ReplaceWith(x => x.Versions.First());
    var update = Builders<TestDocument>.Update.Pipeline(pipeline);
    var result = await _collection.UpdateOneAsync(
        filter, 
        update, 
        new UpdateOptions
        {
            IsUpsert = true
        });
    

    The code first serializes the document to a string using the ToJson method; afterwards, it parses the $set stage from the BSON-string (note the Replace that inserts the serialized document). Finally, it sets up an aggregation pipeline that first inserts the existing document and the new one into an array and sorts the array by the Version property in descending order. The second stage replaces the document with the first one in the array.

    As said above, this code is a sample; test and adjust it to your needs.

    Alternative approach: instead of parsing the $set stage from a string you could also set it up using a BsonDocument. For the sample, I chose the string for better readability.