Search code examples
c#multithreadingtask-parallel-libraryproducer-consumer

TPL Producer Consumer in a FIFO order C#


I'm limited to .NET 3.5 and I'm using TPL. The scenario is producer-consumer but there is no problem of blocking. PLINQ cannot be used in this scenario (because of limitations) and what we want to achieve is the fastest way to produce many items (where each production is a long-running one, and the number of items exceeds 100,000) but each item must be consumed in a FIFO order (which means, the first item I asked to be produced must be consumed first, even if it was created after other items) and also consumed as fast as possible.

For this problem I tried using a task list, wait for the first item in the list to be completed (taskList.First().IsCompleted()) and then using the consuming function on it, but for some reason I seem to run out of memory (maybe too many items in the task list because of tasks waiting to start?) Is there any better way to do that? (I'm trying to achieve the fastest possible)

Many thanks!


Solution

  • OK after the edit - instead of adding the results in the BlockingCollection, add the Tasks in the blocking collection. This has the feature where the items are processed in order AND there is a maximum parallelism which will prevent too many threads from kicking off and you eating up all your memory.

    https://dotnetfiddle.net/lUbSqB

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Collections.Generic;
    
    public class Program
    {
        private static BlockingCollection<Task<int>> BlockingCollection {get;set;}  
    
        public static void Producer(int numTasks)
        {
            Random r = new Random(7);
            for(int i = 0 ; i < numTasks ; i++)
            {
                int closured = i;
                Task<int> task = new Task<int>(()=>
                { 
                    Thread.Sleep(r.Next(100));
                    Console.WriteLine("Produced: " + closured);
                    return closured;
                });
                BlockingCollection.Add(task);
                task.Start();
            }
            BlockingCollection.CompleteAdding();
        }
    
    
        public static void Main()
        {
            int numTasks = 20;
            int maxParallelism = 3;
    
            BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);
    
            Task.Factory.StartNew(()=> Producer(numTasks));
    
            foreach(var task in BlockingCollection.GetConsumingEnumerable())
            {
                task.Wait();
                Console.WriteLine("              Consumed: "+ task.Result);
                task.Dispose();
            }
    
        }
    }
    

    And the results:

    Produced: 0
                  Consumed: 0
    Produced: 1
                  Consumed: 1
    Produced: 3
    Produced: 2
                  Consumed: 2
                  Consumed: 3
    Produced: 4
                  Consumed: 4
    Produced: 6
    Produced: 5
                  Consumed: 5
                  Consumed: 6
    Produced: 7
                  Consumed: 7
    Produced: 8
                  Consumed: 8
    Produced: 10
    Produced: 9
                  Consumed: 9
                  Consumed: 10
    Produced: 12
    Produced: 13
    Produced: 11
                  Consumed: 11
                  Consumed: 12
                  Consumed: 13
    Produced: 15
    Produced: 14
                  Consumed: 14
                  Consumed: 15
    Produced: 17
    Produced: 16
    Produced: 18
                  Consumed: 16
                  Consumed: 17
                  Consumed: 18
    Produced: 19
                  Consumed: 19