Search code examples
c#asynchronousazure-cognitive-servicesopenai-apiazure-openai

yield not returning result immediately to caller api - C#8, IAsyncEnumerable


I am using .Net 6. My use case is to return stream data from "myapi" to a "middle api(BFF)" to client app in react.

I have a code in "myapi" endpoint that should yield a result as soon as it receives it -

myapi code -


public async IAsyncEnumerable<string> GetStreamingResponse()
        {            
            var rawAzureOpenAIRequest = new CompletionsRequest();
            rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
            CompletionsOptions optns = new CompletionsOptions();
            optns.Prompts.Add("add 6+1 :");
            optns.Prompts.Add("below is the summary of technical consultant role in software");
var azResponse = await _openAIRepository.GetStreamingResponse(rawAzureOpenAIRequest.Model, optns,
                canToken);

            await foreach (var choice in azResponse.Value.GetChoicesStreaming())
            {
                await foreach (var message in choice.GetTextStreaming())
                {
                    yield return message;
                    await Task.Delay(10000);
                }
            }
}

My consuming "middle bff api" is as below, it is not hitting the breakpoint in consuming api after each yield return which is my issue, ie, control does not return to consuming api after each yield return. I want as soon as a message is yielded returned from the first api above, the consuming api should receive it.

Consuming api code -

[HttpGet]
[Route("v1/testendpoint")]
        public async Task Get()
        {            
            using HttpClient Client = new();
            using HttpResponseMessage response = await Client.GetAsync(
                "http://localhost...",
                HttpCompletionOption.ResponseHeadersRead
            ).ConfigureAwait(false);

            response.EnsureSuccessStatusCode();

           Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

            IAsyncEnumerable<object> messages = JsonSerializer.DeserializeAsyncEnumerable<object>(responseStream,
            new JsonSerializerOptions
            {
                PropertyNameCaseInsensitive = true,
                DefaultBufferSize = 10
            });

            Response.Headers.Add("Content-Type", "text/event-stream");

            await foreach (var message in messages)
            {
                debugger;
                byte[] messageBytes = ASCIIEncoding.ASCII.GetBytes("data:" + message + "\n\n");
                await Response.Body.WriteAsync(messageBytes, 0, messageBytes.Length);
                await Response.Body.FlushAsync();
            }
}

Could someone please explain why is it happening?

I have tried to add a delay to check if the control is returning to consuming api after yielding a return, but it is not.

I also tried hitting the first api that yields with below client-side code and it yields in batches.

fetch("http://localhost:7200/v1...", config)
      .then(async response => {
        const reader = response.body?.getReader();
        if (!reader) {
          return;
        }
        const decoder = new TextDecoder();
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          var item = decoder.decode(value).replace(/\[|]/g, '').replace(/^,/, '');

          var parsedItem = JSON.parse(item);
          console.log(item + "\n");
          debugger;

        }
        reader.releaseLock();
      }, (reason) => {
        console.log(reason);
        debugger;
      });

In the first sending api, the GetTextStreaming method has the following definition - enter image description here

UPDATE:

Trying to return stream directly now - myapi code

public async Task<Stream> GetRawStreamingCompletionResponse()
            {            
                var rawAzureOpenAIRequest = new CompletionsRequest();
                rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
                CompletionsOptions optns = new CompletionsOptions();
                optns.Prompts.Add("add 6+1 :");
                optns.Prompts.Add("below is the summary of technical consultant role in software");
    
                var azResponse = await _openAIRepository
                    .GetStreamingResponse(rawAzureOpenAIRequest.ModelToUse, optns,
                    canToken);
    
                return azResponse.GetRawResponse().ContentStream;
            }

In consuming api -

public async Task Get() {
                var stream = await Client.GetStreamAsync("http://localhost...");
                Response.Headers.Add("Content-Type", "text/event-stream");
                stream.CopyToAsync(this.Response.Body);
                await Response.Body.FlushAsync();            
}

Solution

  • I think I found some reasoning behind it. It depends on when the System.Text.Json library flushes the reponse body when serializing an IAsyncEnumerable. It does not flush the response body after every step of async enumeration, rather it flushes at a buffer size limit that's internal to the JSON serializer.

    On their page the mentioned default buffer size, in bytes, is 16,384.

    I handled it by Flushing Response after every yield.