Search code examples
c#async-awaitproducer-consumer

Async Queue implementation .Wait() faster than await


To provide producer-consumer functionality that can queue and execute async methods one after the other, I'm trying to implement an async queue. I noticed major performance issues using it in a large application.

async Task Loop() {
     while (Verify()) {
         if (!_blockingCollection.TryTake(out var func, 1000, _token)) continue;
         await func.Invoke();       
     }
}

Implementation of AsyncQueue.Add:

public void Add(Func<Task> func) {
    _blockingCollection.Add(func);
}

Example usage from arbitrary thread:

controller.OnEvent += (o, a) => _queue.Add(async (t) => await handle(a));

Execution paths' depend on the state of the application and include

  • async network requests that internally use TaskCompletionSource to return result

  • IO operations

  • tasks that get added to a list and are awaited using Task.WhenAll(...)

  • an async void method that converts an array and awaits a network request

Symptoms: The application slows down gradually.

When I replace await func.Invoke() with func.Invoke().Wait() instead of awaiting it properly, performance improves dramatically and it does not slow down.

Why is that? Is an async queue that uses BlockingCollection a bad idea?

What is a better alternative?


Solution

  • Why is that?

    There isn't enough information in the question to provide an answer to this.

    As others have noted, there's a CPU-consuming spin issue with the loop as it currently is.

    In the meantime, I can at least answer this part:

    Is an async Queue that uses BlockingCollection a bad idea?

    Yes.

    What is a better alternative?

    Use an async-compatible queue. E.g., Channels, or BufferBlock/ActionBlock from TPL Dataflow.

    Example using Channels:

    async Task Loop() {
      await foreach (var func in channelReader.ReadAllAsync()) {
        await func.Invoke();
      }
    }
    

    or if you're not on .NET Core yet:

    async Task Loop() {
      while (await channelReader.WaitToReadAsync()) {
        while (channelReader.TryRead(out var func)) {
          await func.Invoke();
        }
      }
    }