Search code examples
c#async-awaittask-parallel-libraryproducer-consumer

Partitioning with IEnumerable source


I have ConcurrentQueue of type IProducerConsumerCollection i.e.

IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();

and producer method which adds jobs to _queue and consumer method which processes the Job from _queue. Now in the consumer method, I like to process the jobs concurrently. Below is code for sample class with producer and consumer methods:

public class TestQueue
{
    IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();
    private static HttpClient _client = new HttpClient();

    public TestQueue()
    {
        WorkProducerThread();
        WorkConsumerThread();
    }

    public void WorkConsumerThread()
    {
        if (_queue.Count > 0)
        {
            //At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
            var partitioner = Partitioner.Create(_queue).GetPartitions(4);

            Task t = Task.WhenAll(
             from partition in partitioner
             select Task.Run(async () =>
             {
                 using (partition)
                 {
                     while (partition.MoveNext())
                         await CreateJobs(partition.Current);
                 }
             }));

            t.Wait();

            //At this point, queue count is still 20, how to remove item from _queue collection when processed?
        }
    }

    private async Task CreateJobs(Job job)
    {
        HttpContent bodyContent = null;
        await _client.PostAsync("job", bodyContent);
    }



    public void WorkProducerThread()
    {
        if (_queue.Count == 0)
        {
            try
            {
                for (int i = 0; i < 20; i++)
                {
                    Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
                    _queue.TryAdd(job);
                }
            }
            catch (Exception ex)
            {
                //_Log.Error("Exception while adding jobs to collection", ex);
            }
        }
    }

}

public class Job
{
    public int Id { get; set; }
    public string JobName { get; set; }
    public DateTime JobCreated { get; set; }
}

There are 2 problems,

  1. Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions creates 4 partitions but all records are in 1st partition only; 2,3,4 partition are empty. I can't find, why this is happening? Ideally, all 4 partitions should have 5 records each (since total 20 records are in queue). I read this article from MSDN on partitioning but didn't get any clue. Also I checked the partitioning example from this article.

  2. Also, I want to remove the item from _queue after processing in consumer method and there is only one way _queue.TryTake method to remove item. I don't know how to remove item along with partitioning?

I can consider any alternate way to achieve the same result.

Thanks in advance.


Solution

  • Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions creates 4 partitions but all records are in 1st partition only; 2,3,4 partition are empty.

    This is not correct, your queue entries are being partitioned correctly. To verify, change your processing logic slightly to log the partition that is doing the work:

    Task t = Task.WhenAll(
        from partition in partitioner.Select((jobs, i) => new { jobs, i })
        select Task.Run(async () =>
        {
            using (partition.jobs)
            {
                while (partition.jobs.MoveNext())
                {
                    Console.WriteLine(partition.i);
                    await CreateJobs(partition.jobs.Current);
                }
            }
        }));
    

    You will notice that the Console.WriteLine will write values from 0 to 3 - indicating that they are being partitioned correctly.

    Also, I want to remove the item from _queue after processing in consumer method and there is only one way _queue.TryTake method to remove item. I don't know how to remove item along with partitioning?

    You can achieve that with a slight rewrite. The main changes are switching to BlockingCollection and by adding this NuGet package to give access to GetConsumingPartitioner.

    Give this a try:

    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Net.Http;
    using System.Threading.Tasks;
    
    namespace Test
    {
        public class TestQueue
        {
            BlockingCollection<Job> _queue = new BlockingCollection<Job>();
            private static HttpClient _client = new HttpClient();
    
            public TestQueue()
            {
                WorkProducerThread();
                WorkConsumerThread();
            }
    
            public void WorkConsumerThread()
            {
                if (!_queue.IsCompleted)
                {
                    //At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
                    var partitioner = _queue.GetConsumingPartitioner().GetPartitions(4);
    
                    Task t = Task.WhenAll(
                     from partition in partitioner
                     select Task.Run(async () =>
                     {
                         using (partition)
                         {
                             while (partition.MoveNext())
                                 await CreateJobs(partition.Current);
                         }
                     }));
    
    
                    t.Wait();
    
                    Console.WriteLine(_queue.Count);
                }
            }
    
            private async Task CreateJobs(Job job)
            {
                //HttpContent bodyContent = null;
                //await _client.PostAsync("job", bodyContent);
                await Task.Delay(100);
            }
    
    
    
            public void WorkProducerThread()
            {
                if (_queue.Count == 0)
                {
                    try
                    {
                        for (int i = 0; i < 20; i++)
                        {
                            Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
                            _queue.TryAdd(job);
                        }
    
                        _queue.CompleteAdding();
                    }
                    catch (Exception ex)
                    {
                        //_Log.Error("Exception while adding jobs to collection", ex);
                    }
                }
            }
    
        }
    
        public class Job
        {
            public int Id { get; set; }
            public string JobName { get; set; }
            public DateTime JobCreated { get; set; }
        }
        class Program
        {
            static void Main(string[] args)
            {
                var g = new TestQueue();
    
                Console.ReadLine();
            }
        }
    }