Search code examples
c#.net-coresemaphoresystem.threading.channels

Is SemaphoreSlim needed when Channel's SingleReader is set to true


When sending data fast enough, InvalidOperationException is being thrown: 'There is already one outstanding 'SendAsync' call for this WebSocket instance. ClientWebSocket.ReceiveAsync and ClientWebSocket.SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.

The first example uses SemaphoreSlim which allows only one message at a time that prevents that issue from happening.

The question is do I need to do the same SemaphoreSlim workaround in the second example since SingleReader = true is specified in the channel options? It should basically be the same, but I would like someone to confirm it, so there are no surprises.

Without System.Threading.Channels

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);

public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
    if (_webSocket.State != WebSocketState.Open)
    {
        return false;
    }
    
    // Only one message can be sent at a time. Wait until a message can be sent.
    await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
    
    // We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
    try
    {
        await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
    }
    catch (TaskCanceledException)
    {
        sendAsyncSemaphore.Release();
        return false;
    }
    
    // Our thread can now release the sendAsyncSemaphore so another message can be sent.
    sendAsyncSemaphore.Release();
    return true;
}

public void Dispose()
{
    sendAsyncSemaphore.Dispose();
}

With System.Threading.Channels

public class WebSocketClient
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public WebSocketClient(WebSocket webSocket)
    {
        _webSocket = webSocket;

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            if (_webSocket.State != WebSocketState.Open)
            {
                return;
            }

            var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
            await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
        }
    }
}

Solution

  • Short answer yes! You still need something to limit the number of outstanding read calls when SingleReader = true. The documentation clearly states, you set that property to true 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'. So the property does not provide limitation on the senders.

    Using a semaphore or any other synchronization primitive is not a work around in my opinion. If you take a look at the code, you'll see that setting the SingleReader option gets you a SingleConsumerUnboundedChannel. If you don't limit the readers yourself you'll end up unintentionally cancelling read operations.