Search code examples
mongodbmongodb-.net-driver

Async update or insert MongoDB documents using .Net driver


I have the document like this

public class SomeDocument
{
    public Guid Id { get; set; }
    public string PropertyA { get; set; }
    public string PropertyB { get; set; }
}

Now I have two different services (A and B) that update PropertyA and PropertyB appropriately and work in asynchronous manner. That means I don't know what service will finish first and should create the document and who should update it.

So, to update (or create) the document I currently use code like this in service A

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);

and the next code from service B

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);

Everything looks fine but sometimes I get the next error when both services work simultaneously

Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }.
   at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply)
   at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

How should I insert/update documents in this case?

UPDATE

Extension did the trick

    public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
    {
        try
        {
            return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
        }
        catch (MongoException ex)
        {
            Thread.Sleep(10);

            return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
        }
    }

using try/catch at first glans looks strange and I didn't like it from the beginning but after reading https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique-index all doubt gone away.


Solution

  • Well, this is syncronization issue and unfortunately there is no simple solution for it. In order to find a hack, let's disect what might be happening in backend.

    Let's assume we have two threads (services) trying to upsert a document.

    t1: 00:00:00.250 -> find document with Id (1)
    t2: 00:00:00.255 -> find document with id (1)
    
    t1: 00:00:00.260 -> No document found
    t2: 00:00:00.262 -> No document found
    
    t1: 00:00:00.300 -> Insert a document with Id(1)
    t2: 00:00:00.300 -> Insert a document with Id(1)
    

    Bingo... we got exception. Both threads are trying to insert document with same id.

    No what we can do here?

    Let's turn this shortcommig to our advantage. Capture this exception and try again to call upsert. This time, it will successfuly find document and update it.

    I have modified code for ServiceA nd ServiceB as below and tried to insert 10000 documents in tight loop:

    public async Task ServiceA(Guid id)
    {
        var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
        var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
    
        var options = new UpdateOptions() { IsUpsert = true };
        var database = _client.GetDatabase("stackoverflow");
        var collection = database.GetCollection<SomeDocument>(CollectionName,
            new MongoCollectionSettings
            {
                WriteConcern = WriteConcern.W1
            });
    
        await collection.UpdateOneAsync(filter, update, options);
    }
    
    public async Task ServiceB(Guid id)
    {
        var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
        var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
    
        var options = new UpdateOptions() { IsUpsert = true };
        var database = _client.GetDatabase("stackoverflow");
        var collection = database.GetCollection<SomeDocument>(CollectionName,
            new MongoCollectionSettings
            {
                WriteConcern = WriteConcern.W1
            });
    
        await collection.UpdateOneAsync(filter, update, options);
    }
    

    Here is my lanuching code. Not perfect but serves purpose.

    for (var i = 0; i < 10000; i++)
    {
        var _guid = Guid.NewGuid();
        var _tasks = new[]
        {
          new Task(async (x) =>
          {
              var p = new Program();
              try
              {
                  await p.ServiceA(Guid.Parse(x.ToString()));
              }
              catch (MongoWriteException me)
              {
                  await Task.Delay(5);
                  await p.ServiceA(Guid.Parse(x.ToString()));
              }
          }, _guid),
          new Task(async (x) =>
          {
              var p = new Program();
              try
              {
                  await p.ServiceB(Guid.Parse(x.ToString()));
              }
              catch (MongoWriteException me)
              {
                  await Task.Delay(5);
                  await p.ServiceB(Guid.Parse(x.ToString()));
              }
          }, _guid)
        };
    
        _tasks[0].Start();
        _tasks[1].Start();
        Task.WaitAll(_tasks);
    }