Search code examples
c#semaphore

Concurrent entrancy occurs despite of use of SemaphoreSlim


I expected the following Connected and Disconnected handlers are called alternately.

But it wasn't. It is not 100% reproducible but sometimes the test failed. Repository is here.

I attach simplified code here because full code will be long. Please refer to the repository for the reproducible example.

  public class ClientStressTest3 {
    [Fact]
    public async Task TestAsync() {
      var client = new Client();
      int openCloseDifference = 0;
      var failures = Channel.CreateUnbounded<string>();

      client.Connected += () => {
        Interlocked.Increment(ref openCloseDifference);
        int difference = openCloseDifference;
        Debug.WriteLine("Connected: {}", difference);
        if (Math.Abs(difference) > 1) {
          // Failure point. Why enter here?
          _ = failures.Writer.WriteAsync($"open close difference {difference}");
        }
      };
      client.Disconnected += () => {
        Interlocked.Decrement(ref openCloseDifference);
        int difference = openCloseDifference;
        Debug.WriteLine("Disconnected: {}", difference);
        if (Math.Abs(difference) > 1) {
          _ = failures.Writer.WriteAsync($"open close difference {difference}");
        }
      };

      var tasks = new List<Task>();
      for (int i = 0; i < 625; i++) {
        tasks.Add(Task.Run(async () => {
          try {
            await client.ConnectAsync().ConfigureAwait(false);
          }
          catch (Exception ex) { }
        }));
        tasks.Add(Task.Run(async () => {
          try {
            await client.CloseAsync().ConfigureAwait(false);
          }
          catch (Exception ex) { }
        }));
      }
      await Task.WhenAll(tasks).ConfigureAwait(false);
      while (await failures.Reader.WaitToReadAsync().ConfigureAwait(false)) {
        string failure = await failures.Reader.ReadAsync().ConfigureAwait(false);
        throw new Exception(failure);
      }
    }
  }

  class Client {
    public event Action Connected = delegate { };
    public event Action Disconnected = delegate { };

    private readonly SemaphoreSlim _connectSemaphore = new(1);
    private Task _dispatch = Task.CompletedTask;
    private WebSocket _clientSocket;

    public async Task ConnectAsync() {
      await _connectSemaphore.WaitAsync().ConfigureAwait(false);
      // _dispatch may be same among multiple threads if restored at the same time.
      try {
        await _clientSocket.ConnectAsync().ConfigureAwait(false);
        if (_dispatch != null) {
          _dispatch = _dispatch.ContinueWith((_) => DispatchEventAsync(), TaskScheduler.Default);
        }
      }
      finally {
        _connectSemaphore.Release();
      }
    }

    private async Task DispatchEventAsync() {
      try {
        Connected();
      }
      catch (Exception exception) { }
      try {
        while (await events.WaitToReadAsync().ConfigureAwait(false)) {
          DispatchEvent(await events.ReadAsync().ConfigureAwait(false));
        }
      }
      catch (Exception exception) { }
      try {
        Disconnected();
      }
      catch (Exception ex) { }
    }
  }

I think _connectSemaphore will guard the inner code execution, and only one thread will execute DispatchEventAsync.

But it seems sometimes two threads enter DispatchEventAsync at the same time. I can't understand this.


Solution

  • ContinueWith is a low-level method with dangerous default behavior (link is to my blog). In this case, your code is not behaving how you think it should because ContinueWith (like StartNew) doesn't understand asynchronous delegates.

    Specifically, the task returned from ContinueWith (the same task stored in _dispatch) will complete when DispatchEventAsync asynchronously yields (i.e., hits its first await that asynchronously waits). This is likely the call to WaitToReadAsync, which is after Connected and before Disconnected. So the _dispatch task completes after Connected and before Disconnected.

    Ideally, you should avoid ContinueWith completely. Sometimes a local asynchronous method helps, e.g.:

    public async Task ConnectAsync() {
      await _connectSemaphore.WaitAsync().ConfigureAwait(false);
      try {
        await _clientSocket.ConnectAsync().ConfigureAwait(false);
        if (_dispatch != null) {
          _dispatch = ChainAsync(_dispatch);
        }
      }
      finally {
        _connectSemaphore.Release();
      }
    
      static async Task ChainAsync(Task dispatch)
      {
        await dispatch;
        await DispatchEventAsync();
      }
    }
    

    If you do want to continue using ContinueWith for some reason, then you can use Unwrap:

    public async Task ConnectAsync() {
      await _connectSemaphore.WaitAsync().ConfigureAwait(false);
      try {
        await _clientSocket.ConnectAsync().ConfigureAwait(false);
        if (_dispatch != null) {
          _dispatch = _dispatch.ContinueWith(_ => DispatchEventAsync(), TaskScheduler.Default)
              .Unwrap();
        }
      }
      finally {
        _connectSemaphore.Release();
      }
    }
    

    Using either of these approaches, the _dispatch task will now complete at the end of DispatchEventAsync.