Search code examples
c#.nettask-parallel-libraryasync-awaitblockingcollection

blocking collection process n items at a time - continuing as soon as 1 is done


I have the following Scenario.

  1. I take 50 jobs from the database into a blocking collection.

  2. Each job is a long running one. (potentially could be). So I want to run them in a separate thread. (I know - it may be better to run them as Task.WhenAll and let the TPL figure it out - but I want to control how many runs simultaneously)

  3. Say I want to run 5 of them simultaneously (configurable)

  4. I create 5 tasks (TPL), one for each job and run them in parallel.

What I want to do is to pick up the next Job in the blocking collection as soon as one of the jobs from step 4 is complete and keep going until all 50 are done.

I am thinking of creating a Static blockingCollection and a TaskCompletionSource which will be invoked when a job is complete and then it can call the consumer again to pick one job at a time from the queue. I would also like to call async/await on each job - but that's on top of this - not sure if that has an impact on the approach.

Is this the right way to accomplish what I'm trying to do?

Similar to this link, but catch is that I want to process the next Job as soon as one of the first N items are done. Not after all N are done.

Update :

Ok, I have this code snippet doing exactly what I want, if someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is done with current. Only 5 threads are active at any given time. I understand this may not work 100% like this always, and will have performance issues of context switching if used with one cpu/core.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

Job 2 started on thread :13. wait time:3600000ms. Time:8/29/2014 3:14:43 PM

Job 4 started on thread :14. wait time:15000ms. Time:8/29/2014 3:14:43 PM

Job 0 started on thread :7. wait time:600000ms. Time:8/29/2014 3:14:43 PM

Job 1 started on thread :12. wait time:900000ms. Time:8/29/2014 3:14:43 PM

Job 3 started on thread :11. wait time:120000ms. Time:8/29/2014 3:14:43 PM

job 4 finished on thread :14. 8/29/2014 3:14:58 PM

Job 5 started on thread :14. wait time:1800000ms. Time:8/29/2014 3:14:58 PM

job 3 finished on thread :11. 8/29/2014 3:16:43 PM

Job 6 started on thread :11. wait time:1200000ms. Time:8/29/2014 3:16:43 PM

job 0 finished on thread :7. 8/29/2014 3:24:43 PM

Job 7 started on thread :7. wait time:30000ms. Time:8/29/2014 3:24:43 PM

job 7 finished on thread :7. 8/29/2014 3:25:13 PM

Job 8 started on thread :7. wait time:100000ms. Time:8/29/2014 3:25:13 PM

job 8 finished on thread :7. 8/29/2014 3:26:53 PM

Job 9 started on thread :7. wait time:900000ms. Time:8/29/2014 3:26:53 PM

job 1 finished on thread :12. 8/29/2014 3:29:43 PM

Job 10 started on thread :12. wait time:300000ms. Time:8/29/2014 3:29:43 PM

job 10 finished on thread :12. 8/29/2014 3:34:43 PM

Job 11 started on thread :12. wait time:600000ms. Time:8/29/2014 3:34:43 PM

job 6 finished on thread :11. 8/29/2014 3:36:43 PM

Job 12 started on thread :11. wait time:300000ms. Time:8/29/2014 3:36:43 PM

job 12 finished on thread :11. 8/29/2014 3:41:43 PM

Job 13 started on thread :11. wait time:100000ms. Time:8/29/2014 3:41:43 PM

job 9 finished on thread :7. 8/29/2014 3:41:53 PM

Job 14 started on thread :7. wait time:300000ms. Time:8/29/2014 3:41:53 PM

job 13 finished on thread :11. 8/29/2014 3:43:23 PM

job 11 finished on thread :12. 8/29/2014 3:44:43 PM

job 5 finished on thread :14. 8/29/2014 3:44:58 PM

job 14 finished on thread :7. 8/29/2014 3:46:53 PM

job 2 finished on thread :13. 8/29/2014 4:14:43 PM


Solution

  • You can easily achieve what you need using TPL Dataflow.

    What you can do is use BufferBlock<T>, which is a buffer for storing you data, and link it together with an ActionBlock<T> which will consume those requests as they're coming in from the BufferBlock<T>.

    Now, the beauty here is that you can specify how many requests you want the ActionBlock<T> to handle concurrently using the ExecutionDataflowBlockOptions class.

    Here's a simplified console version, which processes a bunch of numbers as they're coming in, prints their name and Thread.ManagedThreadID:

    private static void Main(string[] args)
    {
        var bufferBlock = new BufferBlock<int>();
    
        var actionBlock =
            new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                      i, Thread.CurrentThread.ManagedThreadId),
                                 new ExecutionDataflowBlockOptions 
                                     {MaxDegreeOfParallelism = 5});
    
        bufferBlock.LinkTo(actionBlock);
        Produce(bufferBlock);
    
        Console.ReadKey();
    }
    
    private static void Produce(BufferBlock<int> bufferBlock)
    {
        foreach (var num in Enumerable.Range(0, 500))
        {
            bufferBlock.Post(num);
        }
    }
    

    You can also post them asynchronously if needed, using the awaitable BufferBlock.SendAsync

    That way, you let the TPL handle all the throttling for you without needing to do it manually.