Search code examples
c#multithreadingwaitproducer-consumerblockingcollection

Multithreading BlockingCollection Alternatives to GetConsumingEnumerable() Producer-Consumer


I have a situation where I have multiple producers and multiple consumers. The producers enters a job into a queue. I chose the BlockingCollection and it works great since I need the consumers to wait for a job to be found. However, if I use the GetConsumingEnumerable() feature the order of the items in the collection change... this is not what I need.

It even says in MSDN http://msdn.microsoft.com/en-us/library/dd287186.aspx that it does not preserve the order of the items.

Does anyone know an alternative for this situation?

I see that the Take method is available but does it also provide a 'wait' condition for the consumer threads?

It says http://msdn.microsoft.com/en-us/library/dd287085.aspx

'A call to Take may block until an item is available to be removed.' Is it better to use TryTake? I really need the thread to wait and keep checking for a job.


Solution

  • Take blocks the thread till something comes available.

    TryTake as the name implies tries to do so but returns a bool if it fails or succeeds. Allowing for more flex using it:

    while(goingOn){
       if( q.TryTake(out var){
          Process(var)
       }
       else{
          DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
       }
    }
    

    instead of

    while(goingOn){
       if( var x = q.Take(){
          //w'll wait till this ever will happen and then we:
          Process(var)
       }
    }
    

    My votes are for TryTake :-)

    EXAMPLE:

        public class ProducerConsumer<T> {
    
            public struct Message {
                public T Data;
            }
    
            private readonly ThreadRunner _producer;
            private readonly ThreadRunner _consumer;
    
            public ProducerConsumer(Func<T> produce, Action<T> consume) {
                var q = new BlockingCollection<Message>();
                _producer = new Producer(produce,q);
                _consumer = new Consumer(consume,q);
            }
    
            public void Start() {
                _producer.Run();
                _consumer.Run();
            }
    
            public void Stop() {
                _producer.Stop();
                _consumer.Stop();
            }
    
            private class Producer : ThreadRunner {
    
                public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
                    _produce = produce;
                }
    
                private readonly Func<T> _produce;
    
                public override void Worker() {
                    try {
                        while (KeepRunning) {
                            var item = _produce();
                            MessageQ.TryAdd(new Message{Data = item});
                        }
                    }
                    catch (ThreadInterruptedException) {
                        WasInterrupted = true;
                    }
                }
            }
    
            public abstract class ThreadRunner {
    
                protected readonly BlockingCollection<Message> MessageQ;
    
                protected ThreadRunner(BlockingCollection<Message> q) {
                    MessageQ = q;
                }
    
                protected Thread Runner;
                protected bool KeepRunning = true;
    
                public bool WasInterrupted;
    
                public abstract void Worker();
    
                public void Run() {
                    Runner = new Thread(Worker);
                    Runner.Start();
                }
    
                public void Stop() {
                    KeepRunning = false;
                    Runner.Interrupt();
                    Runner.Join();
                }
    
            }
    
            class Consumer : ThreadRunner {
    
                private readonly Action<T> _consume;
    
                public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
                    _consume = consume;
                }
    
                public override void Worker() {
                    try {
                        while (KeepRunning) {
                            Message message;
                            if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
                                _consume(message.Data);
                            }
                            else {
                                //There's nothing in the Q so I have some spare time...
                                //Excellent moment to update my statisics or update some history to logfiles
                                //for now we sleep:
                                Thread.Sleep(TimeSpan.FromMilliseconds(100));
                            }
                        }
                    }
                    catch (ThreadInterruptedException) {
                        WasInterrupted = true;
                    }
                }
            }
        }
    
    }
    

    USAGE:

    [Fact]
    public void ConsumerShouldConsume() {
    
        var produced = 0;
        var consumed = 0;
    
        Func<int> produce = () => {
            Thread.Sleep(TimeSpan.FromMilliseconds(100));
            produced++;
            return new Random(2).Next(1000);
        };
    
        Action<int> consume = c => { consumed++; };
    
        var t = new ProducerConsumer<int>(produce, consume);
        t.Start();
        Thread.Sleep(TimeSpan.FromSeconds(5));
        t.Stop();
    
        Assert.InRange(produced,40,60);
        Assert.InRange(consumed, 40, 60);
    
    }