Search code examples
c#.net-corewebsocketsystem.io.pipelines

Gracefully closing pipe web sockets


I'm trying to create a pipe web socket client, which is going to pull data off exchanges. The code is something between David Fowler's BedrockFramework and @StephenCleary's TCP Chat. I'm not sure if there is a difference between IDuplexPipe (David's way) and normal pipes (Stephen's way), I believe they are two ways to express same thing.

The question is how do I gracefully stop/close everything, i.e. basically I want to create a StopAsync method? Side question, what is the reason that they don't use a CancellationToken?

public sealed class PipeSocketClient
{
    private readonly string _baseUrl;

    private readonly Pipe _inputPipe;

    private readonly Pipe _outputPipe;

    private readonly ClientWebSocket _webSocket;

    private volatile bool _aborted;

    public PipeSocketClient(string baseUrl, PipeOptions? pipeOptions = default)
    {
        _baseUrl = baseUrl;

        _webSocket = new ClientWebSocket();

        _inputPipe = new Pipe(pipeOptions ?? PipeOptions.Default);
        _outputPipe = new Pipe();
    }

    public async ValueTask<PipeSocketClient> StartAsync()
    {
        await _webSocket.ConnectAsync(new Uri(_baseUrl), CancellationToken.None).ConfigureAwait(false);

        _ = ExecuteAsync(_webSocket, _inputPipe.Writer, _outputPipe.Reader);

        return this;
    }

    private async Task ExecuteAsync(WebSocket webSocket, PipeWriter pipeWriter, PipeReader pipeReader)
    {
        Exception? sendError = null;
        try
        {
            // Spawn send and receive logic
            var receiveTask = DoReceiveAsync(webSocket, pipeWriter);
            var sendTask = DoSendAsync(webSocket, pipeReader);

            // If the sending task completes then close the receive
            // We don't need to do this in the other direction because the kestrel
            // will trigger the output closing once the input is complete.
            if (await Task.WhenAny(receiveTask, sendTask).ConfigureAwait(false) == sendTask)
            {
                // Tell the reader it's being aborted
                webSocket.Dispose();
            }

            // Now wait for both to complete
            await receiveTask.ConfigureAwait(false);
            sendError = await sendTask.ConfigureAwait(false);

            // Dispose the socket(should noop if already called)
            webSocket.Dispose();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Unexpected exception: {ex.Message}");
        }
        finally
        {
            // Complete the output after disposing the socket
            await pipeReader.CompleteAsync(sendError).ConfigureAwait(false);
        }
    }

    private async Task DoReceiveAsync(WebSocket webSocket, PipeWriter pipeWriter)
    {
        Exception? error = null;

        try
        {
            await ProcessReceivesAsync(webSocket, pipeWriter).ConfigureAwait(false);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
        {
            error = new ConnectionResetException(ex.Message, ex);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
                                         ex.SocketErrorCode == SocketError.ConnectionAborted ||
                                         ex.SocketErrorCode == SocketError.Interrupted ||
                                         ex.SocketErrorCode == SocketError.InvalidArgument)
        {
            if (!_aborted)
            {
                // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
                error = new ConnectionAbortedException();
            }
        }
        catch (ObjectDisposedException)
        {
            if (!_aborted)
            {
                error = new ConnectionAbortedException();
            }
        }
        catch (IOException ex)
        {
            error = ex;
        }
        catch (Exception ex)
        {
            error = new IOException(ex.Message, ex);
        }
        finally
        {
            if (_aborted)
            {
                error ??= new ConnectionAbortedException();
            }

            await pipeWriter.CompleteAsync(error).ConfigureAwait(false);
        }
    }

    private async Task ProcessReceivesAsync(WebSocket webSocket, PipeWriter pipeWriter)
    {
        while (true)
        {
            // Ensure we have some reasonable amount of buffer space
            var buffer = pipeWriter.GetMemory();

            var bytesRead = await webSocket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);

            if (bytesRead.Count == 0)
            {
                break;
            }

            pipeWriter.Advance(bytesRead.Count);

            var flushTask = pipeWriter.FlushAsync();

            if (!flushTask.IsCompleted)
            {
                await flushTask.ConfigureAwait(false);
            }

            var result = flushTask.Result;
            if (result.IsCompleted)
            {
                // Pipe consumer is shut down, do we stop writing
                break;
            }
        }
    }

    private async Task<Exception?> DoSendAsync(WebSocket webSocket, PipeReader pipeReader)
    {
        Exception? error = null;

        try
        {
            await ProcessSendsAsync(webSocket, pipeReader).ConfigureAwait(false);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
        {
            error = null;
        }
        catch (ObjectDisposedException)
        {
            error = null;
        }
        catch (IOException ex)
        {
            error = ex;
        }
        catch (Exception ex)
        {
            error = new IOException(ex.Message, ex);
        }
        finally
        {
            _aborted = true;
            webSocket.Abort();
        }

        return error;
    }

    private async Task ProcessSendsAsync(WebSocket webSocket, PipeReader pipeReader)
    {
        while (true)
        {
            // Wait for data to write from the pipe producer
            var result = await pipeReader.ReadAsync().ConfigureAwait(false);
            var buffer = result.Buffer;

            if (result.IsCanceled)
            {
                break;
            }

            var end = buffer.End;
            var isCompleted = result.IsCompleted;
            if (!buffer.IsEmpty)
            {
                await webSocket.SendAsync(buffer, WebSocketMessageType.Text).ConfigureAwait(false);
            }

            pipeReader.AdvanceTo(end);

            if (isCompleted)
            {
                break;
            }
        }
    }
}

Solution

  • I haven't looked at David Fowler's framework in detail yet, but I would expect he uses the same completion semantics that mine does: when the output (socket writing) pipe completes, then the (web)socket is closed.

    In your code, ProcessSendsAsync will return when the pipeline is completed, and this will cause DoSendAsync to call webSocket.Abort. I assume you should modify DoSendAsync to call CloseAsync when the pipeline is completed without an exception, and only call Abort when it there is an exception.

    If you haven't seen it yet, this is the video where I discuss how I handle shutdowns, both exceptional and graceful.