Search code examples
.netsystem.io.pipelines

How to detect when a PipeReader has reached the end of the data source (end of pipe)?


We can call ReadAsync() and examine a buffer for read bytes...

PipeReader reader = ...;
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
long availableBytes = buffer.Length

If the length does not increase from before the call to ReadAsync, does that indicate the end of the pipe (no more bytes to read)? If not then what is the correct way of detecting 'end of pipe'?

We can signal consumption of bytes in the buffer like so:

reader.AdvanceTo(count);

And then check if there are any unconsumed bytes, or the possibility of future bytes being supplied (i.e. the producer has not yet signalled that it has stopped adding new bytes to the pipe)

readResult.IsCompleted

But if I am looking for a sequence (or sequences) in the buffer and waiting for a complete sequence before consuming it, then IsComplete appears to remain false even when the buffer contains all of the available bytes, and the producer has signalled completion.

Thanks.


Solution

  • .IsCompleted does indicate the end of the pipe, in the sense of a socket etc being closed (as opposed to being open but not having any more data right now); I expect what is happening here is that you're:

    • fetching a read buffer
    • reading the entire buffer looking for a sequence, and not finding it
    • therefore processing zero bytes
    • and therefore saying .AdvanceTo(zero)

    There's an important second overload of AdvanceTo - you shouldn't just tell it what you consumed; you should tell it what you examined, which in this case is probably: everything; doing this can avoid you getting stuck in a hot loop parsing the same incomplete frame over and over and over and over and over. For example, one of my read loops looks like (simplified):

    while (true)
    {
        var readResult = await input.ReadAsync();
        var buffer = readResult.Buffer;
        int handled = TryConsume(ref buffer); // note: this **changes** buffer, slicing (Slice)
        // data from the start; when TryConsume exits, it will contain everything
        // that is *left*, but we will have effectively examined all of it; we will
        // have consumed any complete frames that we can from it
    
        // advance the pipe
        input.AdvanceTo(buffer.Start, buffer.End);
    
        // exit if we aren't making progress, and nothing else will be forthcoming
        if (handled == 0 && readResult.IsCompleted)
        {
            break; // no more data, or trailing incomplete messages
        }
    }