Search code examples
c#wcfstreamingopenai-api

Streaming WCF response


I have a WCF web service which calls a generative model using the OpenAI C# client. I want to stream this response, as it comes in, to the WCF client. My current transfer mode is set to "Streamed" with a closeTimeout of 1 second. A simplified version of the code that the service currently runs is the following:

var collectionResult = openAiClient.CompleteChatStreaming(new[] { userMsg }, options: modelOptions);
var stream = new MemoryStream();
var sw = new StreamWriter(stream)
{
    AutoFlush = true
};

Task.Run(async () =>
{
    foreach (var col in collectionResult)
        foreach (var data in col.ContentUpdate)
            sw.Write(data.Text);
});
stream.Position = 0;
return stream;

When debugging this, I see that before the outer loop even begins, the connection is closed, and I can't figure out why. This is a snippet of my client code using the returned stream:

using (var sr = new StreamReader(stream))
{
    var data = "";
    while ((data = await sr.ReadLineAsync()) != null)
    {
        Console.WriteLine(data);
    }
}

Is there even a way to achieve this kind of streaming response in WCF where the server is a kind of middleman between data being streamed in and data being streamed out? How would it be implemented correctly? The client needs to be able to see the model output as if they were chatting with a normal chatbot, so I cannot wait for the entire response to be generated before responding.


Solution

  • This code has a producer and consumer acting on a single MemoryStream. This isn't going to work:

    • a MemoryStream only has one Position - not a separate read and write position
    • it has no concurrency protection in the case that read and write happens at the same time
    • EOF is implicit by simply reaching the end, which is immediately true when nothing has been written, or when the reader has simply "caught up"

    I think what you want here is Pipe, not MemoryStream. A Pipe is a different API with separate producer and consumer primitives, back-pressure support (to prevent over-buffering), and explicit EOF, intended to be used as a buffer for this scenario.