How to implement the correct ReadLine() using System.IO.Pipelines and SequenceReader
I have implemented ReadLine() in C# using Pipe and SequenceReader.
The result of ReadLine() is not handled correctly when the PipeReader input is fragmented.
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Text;
internal class Program
{
public static async Task Main()
{
var pipe = new Pipe();
var writer = pipe.Writer;
var reader = pipe.Reader;
var index = 0;
while (true)
{
// Fragment simulation
if (index < Data.Length)
{
var data = Encoding.UTF8.GetBytes(Data[index]);
await writer.WriteAsync(data);
writer.Advance(data.Length);
index++;
}
else
{
await pipe.Writer.CompleteAsync();
}
// Read
var result = await reader.ReadAsync();
var buffer = result.Buffer;
while (ReadLine(ref buffer) is { } str)
{
Debug.WriteLine(str);
}
// ?
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted || result.IsCanceled)
{
break;
}
}
await reader.CompleteAsync();
}
private static string? ReadLine(ref ReadOnlySequence<byte> buffer)
{
var reader = new SequenceReader<byte>(buffer);
if (reader.TryReadTo(out ReadOnlySpan<byte> line, "\r\n"u8))
{
buffer = buffer.Slice(reader.Position);
return Encoding.UTF8.GetString(line);
}
return default;
}
private static readonly string[] Data =
{
"aaa",
"bbb",
"ccc\r\n",
"xxx",
"\r\n",
"yyy",
"\r",
"\n"
};
}
"aaabbbccc"
"xxx"
"yyy"
"aaa\0\0\0bbb\0\0\0ccc"
"\0\0\0\0\0xxx\0\0\0"
I think I am using PipeReader.AdvanceTo() incorrectly, what would be the correct description?
Note that until a newline is found, we want the buffer to be managed in the PipeReader.
I do not want an answer that uses StringBuilder to process up to the part that has been read.
The WriteAsync
method will automatically advance the write position. The Advance
method is used when using other methods to write data.
await writer.WriteAsync(data);
//writer.Advance(data.Length);
For example this is an equivalent method to WriteAsync
but using Advance
:
data.AsSpan().CopyTo(writer.GetSpan(data.Length));
writer.Advance(data.Length);
await writer.FlushAsync();