Search code examples
c#.netsystem.threading.channels

Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken?


Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken? It doesn't seem to be throwing OperationCanceledException/TaskCanceledException?

I know if these two methods were called in a fire and forget manner, i.e. _ = SendLoopAsync(); _ = ReceiveLoopAsync();, it would've crashed the task with no displayed message/exception because they were not awaited, meaning that we're losing the exceptions.

I don't want it to crash that task without letting me know that it actually has crashed/been cancelled, which means I should probably wrap the whole SendLoopAsync in a try/catch instead of what's between ReadAllAsync's branches.

A small example representing its behavior will be appreciated.

var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);

var client = new ChannelWebSocket(clientWebSocket);

for (var i = 1; i <= 10; i++)
{
    client.Output.TryWrite($"Item: {i}");
}

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI

Console.ReadLine();

public class ChannelExample
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public ChannelExample(WebSocket webSocket)
    {
        _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    public ChannelReader<string> Input => _input.Reader;
    public ChannelWriter<string> Output => _output.Writer;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiving = ReceiveLoopAsync(cancellationToken);
        var sending = SendLoopAsync(cancellationToken);

        var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

        if (completedTask.Exception != null)
        {
            Console.WriteLine("Exception");
        }
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine($"Sending: {message}");
            await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
        }
    }

    private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
    {
        using var buffer = MemoryPool<byte>.Shared.Rent();

        while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
        {
            ValueWebSocketReceiveResult receiveResult;
            do
            {
                receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    return;
                }
            } while (!receiveResult.EndOfMessage);
        }
    }
}

Solution

  • I suspect that it would throw; of course, you can always test that, but - that is the general expected pattern in this scenario. So you would wrap it with a:

    try
    {
       // ...
    }
    catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
    {
        // treat as completion; swallow
    }
    

    Alternatively: you could pass CancellationToken.None into the channel read API, and just use the writer's completion to signify exit (making sure that you call .Complete(...) on the writer when exiting).

    That said: ReadAllAsync probably isn't the preferred API here, since you don't really need it as IAsyncEnumerable<T> - so it may be preferable to use the native channel API, i.e.

    while (await _output.Reader.WaitToReadAsync(cancellationToken))
    {
        while (_output.Reader.TryRead(out var message))
        {
            // ...
        }
    }