Search code examples
c#multithreadingtaskproducer-consumerblockingcollection

BlockingCollection with async task


I'm trying to correctly model a multi-thread single-producer/multi-consumer scenario where a consumer can ask the producer to get an item, but the producer needs to do a time consuming operation to produce it (think about executing a query or printing a document).

My goal is to ensure no consumer can concurrently ask the producer to produce an item. In my real world use case the producer is an hardware controller which must ensure that just 1 request at a time is sent to hardware. Other concurrent requests must eventually wait or be rejected (i know how to reject them, so let's concentrate on making them wait).

I want the producer and each consumer to run in different threads.
I cannot get a clean code using only a BlockingCollection. I had to use it together with a SemaphoreSlim otherwise consumers could incur in race conditions.
I think it should work (infact it worked well in all my tests), even if I'm not 100% sure about it.
This is my program:

Producer:

class Producer : IDisposable
{
    //Explicit waiting item => I feel this should not be there
    private SemaphoreSlim _semaphore;

    private BlockingCollection<Task<string>> _collection;

    public Producer()
    {
        _collection = new BlockingCollection<Task<string>>(new ConcurrentQueue<Task<string>>(), 1);
        _semaphore = new SemaphoreSlim(1, 1);
    }

    public void Start()
    {
        Task consumer = Task.Factory.StartNew(() =>
        {
            try
            {
                while (!_collection.IsCompleted)
                {
                    Task<string> current = _collection.Take();
                    current.RunSynchronously(); //Is this bad?

                    //Signal the long running operation has ended => This is what I'm not happy about
                    _semaphore.Release();
                }
            }
            catch (InvalidOperationException)
            {
                Console.WriteLine("Adding was compeleted!");
            }
        });
    }

    public string GetRandomString(string consumerName)
    {
        Task<string> task = new Task<string>(() =>
        {
            //Simulate long running operation
            Thread.Sleep(100);
            return GetRandomString();
        });

        _collection.Add(task);

        //Wait for long running operation to complete => This is what I'm not happy about
        _semaphore.Wait();

        Console.WriteLine("Producer produced {0} by {1} request", task.Result, consumerName);
        return task.Result;
    }

    public void Dispose()
    {
        _collection.CompleteAdding();
    }

    private string GetRandomString()
    {
        var chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        var random = new Random();
        var result = new string(Enumerable
            .Repeat(chars, 8)
            .Select(s => s[random.Next(s.Length)])
            .ToArray());
        return result;
    }
}

Consumer:

class Consumer
{
    Producer _producer;
    string _name;

    public Consumer(
        Producer producer,
        string name)
    {
        _producer = producer;
        _name = name;
    }

    public string GetOrderedString()
    {
        string produced = _producer.GetRandomString(_name);
        return String.Join(String.Empty, produced.OrderBy(c => c));
    }
}

Console application:

class Program
{
    static void Main(string[] args)
    {
        int consumerNumber = 5;
        int reps = 10;

        Producer prod = new Producer();
        prod.Start();

        Task[] consumers = new Task[consumerNumber];

        for (var cConsumers = 0; cConsumers < consumerNumber; cConsumers++)
        {
            Consumer consumer = new Consumer(prod, String.Format("Consumer{0}", cConsumers + 1));

            Task consumerTask = Task.Factory.StartNew((consumerIndex) =>
            {
                int cConsumerNumber = (int)consumerIndex;
                for (var counter = 0; counter < reps; counter++)
                {
                    string data = consumer.GetOrderedString();
                    Console.WriteLine("Consumer{0} consumed {1} at iteration {2}", cConsumerNumber, data, counter + 1);
                }
            }, cConsumers + 1);

            consumers[cConsumers] = consumerTask;
        }

        Task continuation = Task.Factory.ContinueWhenAll(consumers, (c) =>
        {
            prod.Dispose();
            Console.WriteLine("Producer/Consumer ended");
            Console.ReadLine();
        });

        continuation.Wait();
    }
}

What I'm concerned about is if this is the correct way of approaching the problem or if there is any other best practise you guys can suggest.
I already googled and tried different proposed ideas but every example i tried made the assumption that the producer was able to produce items immediately after they were requested... quite a rare situation in real world applications :)
Any help is greatly appreciated.


Solution

  • If I understand you correctly, you want to ensure that only one task at a time is processed by so called "producer". Then with slight modifications to your code you can do it like this:

    internal class Producer : IDisposable {
        private readonly BlockingCollection<RandomStringRequest> _collection;
    
        public Producer() {
            _collection = new BlockingCollection<RandomStringRequest>(new ConcurrentQueue<RandomStringRequest>());
        }
    
        public void Start() {
            Task consumer = Task.Factory.StartNew(() => {
                try {
                    foreach (var request in _collection.GetConsumingEnumerable()) {
                        Thread.Sleep(100); // long work
                        request.SetResult(GetRandomString());
                    }
                }
                catch (InvalidOperationException) {
                    Console.WriteLine("Adding was compeleted!");
                }
            });
        }
    
        public RandomStringRequest GetRandomString(string consumerName) {
            var request = new RandomStringRequest();
            _collection.Add(request);
            return request;            
        }
    
        public void Dispose() {
            _collection.CompleteAdding();
        }
    
        private string GetRandomString() {
            var chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
            var random = new Random();
            var result = new string(Enumerable
                .Repeat(chars, 8)
                .Select(s => s[random.Next(s.Length)])
                .ToArray());
            return result;
        }
    }
    
    internal class RandomStringRequest : IDisposable {
        private string _result;
        private ManualResetEvent _signal;
    
        public RandomStringRequest() {
            _signal = new ManualResetEvent(false);
        }
    
        public void SetResult(string result) {
            _result = result;
            _signal.Set();
        }
    
        public string GetResult() {
            _signal.WaitOne();
            return _result;
        }
    
        public bool TryGetResult(TimeSpan timeout, out string result) {
            result = null;
            if (_signal.WaitOne(timeout)) {
                result = _result;
                return true;
            }
            return false;
        }
    
        public void Dispose() {
            _signal.Dispose();
        }
    }
    
    internal class Consumer {
        private Producer _producer;
        private string _name;
    
        public Consumer(
            Producer producer,
            string name) {
            _producer = producer;
            _name = name;
        }
    
        public string GetOrderedString() {
            using (var request = _producer.GetRandomString(_name)) {
                // wait here for result to be prepared
                var produced = request.GetResult();
                return String.Join(String.Empty, produced.OrderBy(c => c));
            }
        }
    }
    

    Note that producer is single-threaded and it uses GetConsumingEnumerable. Also there is no semaphore and no Tasks. Instead, RandomStringRequest is returned to consumer, and while calling GetResult or TryGetResult, it will wait until result is produced by producer (or timeout expires). You may also want to pass CancellationTokens in some places (like to GetConsumingEnumerable).