Search code examples
lockingmessage-queuenservicebus

locking during nservicebus handler


I have a scenario wherein a nservicebus message handler I need to prevent multiple messages for the same saga from being executed at the same time.

The handler for arguments sake does something like this (way oversimplified in this example(

Message:

public class MyMessage : IMessage {
  public int OrderId {get;set;}
  public int NewQuantityLevel {get;set;}
}

Saga:

public void Handle(MyMessage message)
{
    // call remote service to get current order quantity
    // do some logic and update remote service with difference between original and new quantity

    Bus.Send(new MyOtherMessage())
}

Now I'm my process I can receive 2 or more of these messages at any time and I do not want to have them retrieve an order quantity that may already be in the process of being updated or modified somewhere else.

I have considered a few solutions:

  1. Obtain a mutex for the order (currently we only have one instance of the worker running on a single machine, but there is potential in the future to have multiple, in which case we might use a redis lock or something similar)
  2. Use sql locking in the service to take a serialized lock on the rows/data (not sure if this would even work however)

Neither of these really seem optimal and it does feel like I'm working against the framework


Solution

  • The saga is the lock.

    As @Hadi mentions, NServiceBus will use optimistic concurrency to make sure that only a single message gets to update a saga instance at a time.

    Instead of doing the update directly in the saga, store the fact that you are doing the update and send a message to do the remote service invocation to a separate message handler in a different endpoint. Storing the fact on the saga and sending the message to perform it will either BOTH be done or not at all. If two messages try to do this at the same time, only one will complete successfully. The other message will get a concurrency exception, roll back to the queue and eventually be retried.

    At that time it will see that there is already a quantity update operation happening. You can then either discard the second message or store some state on the saga to make sure the second quantity update happens once the first completes.

    Moving the remote service invocation outside the saga together with full-duplex request/response messaging ensures a good separation of concerns between the saga, as the process manager, and the message handler, as the integration point.

    Pseudo code

    public class MySaga
    {
        public void Handle(MyMessage message)
        {
            if(Data.CurrentlyUpdatingQuantity)
                return; //or schedule for later
    
            Data.CurrentlyUpdatingQuantity = true;
    
            Bus.Send(new PerformQuantityUpdateMessage(message.OrderId));    
        }
    
        public void Handle(QuantityUpdateResponse message)
        {
            Data.CurrentlyUpdatingQuantity = false;
            Bus.Send(new MyOtherMessage());
        }
    }
    

    Separate messagehandler (NOT PART OF THE SAGA)

    public void Handle(PerformQuantityUpdateMessage message)
    {
        // call remote service to get current order quantity
        // do some logic and update remote service with difference between original and new quantity
    
        Bus.Reply(new QuantityUpdateResponse(message.OrderId));    
    }