Search code examples
c#.netmultithreadingblockingcollection

Cancelling BlockingCollection.GetConsumingEnumerable() and processing what's left


I have one process generating work and a second process with a BlockingCollection<> that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.

Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable()) loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding(). What I find is that my consumer continues to process everything in the queue.

Googling the issues tells me that I need to use a CancellationToken. So I tried that out:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

My producer has:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

When I try this, I get no indication that the OperationCancelledException ever happened.

This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)

So to reiterate: How do I properly use a CancellationToken on BlockingCollection.GetConsumingEnumerable() with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?

(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested always equals false.))


Solution

  • My problem was in how I was trying to cancel the operation. Instead of having my producer owning the CancellationTokenSource, I put it all in the consumer.

    public class cProducer {
        private cConsumer myConsumer = new cConsumer ();
    
        public void onStart () {
            myConsumer.OnStart ();
        }
    
        public void onStop () {
            myConsumer.OnStop ();
        }
    
        public void OnOrderReceived (cOrder newOrder) {
            myConsumer.orderQueue.Add (cOrder);
        }
    }
    
    public class cConsumer {
        private CancellationTokenSource stopFlag;
        public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
        private Task processingTask;
    
        public void OnStart () {
            stopFlag = new CancellationTokenSource ();
            processingTask = Task.Factory.StartNew (() => Process ());
        }
    
        public void OnStop () {
            stopFlag.Cancel ();
            orderQueue.CompleteAdding ();
            processingTask.Wait ();
        }
    
        private void Process () {
            try {
                foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                    // process
                }
            }
            catch (OperationCanceledException) {
                foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                    // log it
                }
            }
        }
    }