Search code examples
c#async-awaitconcurrencytaskproducer-consumer

Wait inside long running background task loop for condition/event to continue iterating?


Consider this toy example:

public class SomeClass
{
    private readonly ConcurrentQueue<int> _sharedData = [];
    private Task? _backgroundTask;


    private async Task Start()
    {

        CancellationTokenSource cts = new();
        _backgroundTask = Task.Run(() => BackgroundWork(cts.Token));

        for (int i = 0; i < 100; i++)
        {

            _sharedData.TryDequeue(out int data);
            Status = string.Join(", ", _sharedData.Select(a => a.ToString())); 
            await Task.Delay(300);
        }
        cts.Cancel();
    }


    private async Task BackgroundWork(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            if (_sharedData.Count >= 5)
            {
                await Task.Delay(250);
                continue;
            }

            _sharedData.Enqueue(Random.Shared.Next());

        }

        Status = "bg task done.";
    }
}

What I am wondering is, if there is a "best practice" example to avoid Task.Delay() in the background work loop. Basically I don't like the idea of it constantly looping until the queue count gets below the threshold. I'd rather have some event trigger the continuation.

Context In the real application, the main loop deals with files that are loaded from a central server. to keep waiting times low, I want to pre-cache the next couple of files while the user works on the current one.


Solution

  • It seems that you are looking for Channel. Here is the schema

    using System.Threading.Channels;
    
    ...
    
    // We want a channel (pipeline) which can hold at most 5 items
    // (let items be of type int for simplicity)
    var channel = Channel.CreateBounded<int>(5); // <- No more than 5 items
    
    // We want to process each item as
    var allItemsTask = Parallel.ForEachAsync(channel.Reader.ReadAllAsync(), 
      async (item, token) => {
        // Actual item processing is here
        Console.WriteLine(item);
    });
    
    // Time to generate items to be processed
    for (var item = 0; item < 100; ++item) {
      // Note, that .Net ensures that channel has 5 items at most
      // If items are generated too fast .Net will wait  
      await channel.Writer.WriteAsync(item);
    }
    
    // No more items are expected
    channel.Writer.TryComplete();
    
    // Wait for all items to be processed
    await allItemsTask;
    

    Fiddle