Search code examples
c#multithreadingtask-parallel-librarythrottlingconcurrent-queue

How to set maximum concurrent threads while dequeuing ConcurrentQueue?


I have one thread responsible for enqueuing and one thread responsible for dequeuing. However, the frequency of the data being enqueued far surpasses the time needed to dequeue + process the data. When I did the following, I ended up with a huge delay in data processing:

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                ProcessData(item);
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //...
        }
    }
}

Next I tried, processing the data in separate tasks, but this ended up affecting other tasks in the project since this treatment ended up taking up most of the resources allocated to the application and generating a high thread count.

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                Task.Run(() => ProcessData(item));
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //
        }
    }
}

Next, I tried the following :

public void HandleData()
{
    List<Task> taskList = new List<Task>();
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                if (taskList.Count <= 20)
                {
                    Task t = Task.Run(() => ProcessData(item));
                    taskList.Add(t);
                }
                else
                {
                    ProcessData(item);
                }
            }
            else
            {
                Thread.Sleep(10);
            }
            taskList.RemoveAll(x => x.IsCompleted);
        }
        catch (Exception e)
        {
            //...
        }
    }
}

This seems to have solved the problem, but I wonder if there is a cleaner way to do it? a way to set a maximum concurrent threads number while dequeuing?


Solution

  • ConcurrentQueue isn't the proper container, especially since it doesn't provide asynchronous operations. A better option would be using an ActionBlock or a Channel combined with Parallel.ForEachAsync.

    Using an ActionBlock

    An ActionBlock combines both an input queue and workers to process the data asynchronously. The workers process the data as soon as it's available. Using an ActionBlock, you can create a block with a set number of workers and start posting data to it. The block will use only the configured number of worker tasks to process the data:

    
    ActionBlock<Data> _block;
    
    public void Initialize()
    {
        var options=new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 20
        }
    
        _block =new ActionBlock(ProcessData,options);
    
    }
    

    Data/messages are posted to the block using either the Post or SendAsync methods. When there's no more data, the Complete method tells the block to shut down after processing any pending items. We can await for pending items to complete by awaiting the Completion property

    public async Task Produce(CancellationToken cancel)
    {
        while(!cancel.IsCancellationRequested)
        {
            var data=ProduceSomething();
            _block.Post(data);
        }
        _block.Complete();
    
        await _block.Completion;
    }
    

    Using Channel

    Another option is to use a Channel instead of a ConcurrentQueue. This class is equivalent to an asynchronous ConcurrentQueue that provides an IAsyncEnumerable<T> stream that can be iterated with await foreach. You can create a specific number of workers to read from either the container itself, or through the IAsyncEnumerable<T> stream. In .NET 6, the last part is a lot easier using Parallel.ForEachAsync with a fixed degree of parallelism option:

    
    async Task<ChannelReader<T>> Producer(CancellationToken token)
    {
        var channel = Channel.CreateUnbounded<T>();
    
        var writer = channel.Writer;
    
        while(!token.IsCancellationRequested)
        {
            var someDate = ProduceData();
            await writer.WriteAsync(someData);
        }
        writer.Complete();
        
        return channel.Reader;
    }    
    
    
    async Task Consumer<T>(ChannelReader<T> input,int dop=20)
    {
        ParallelOptions parallelOptions = new()
        {
            MaxDegreeOfParallelism = dop
        };
     
        await Parallel.ForEachAsync(input.ReadAllAsync(), options,
                                    data=>ProcessData(data));
    }