I have a simple producer consumer setup using a blocking collection. The consumer sits in a loop for the duration of our application waiting for the consumer to put items in the collection, then takes the item out and writes it to a serial port. For some reason collection.Take() blocks forever when there are items in the collection. For this app we might have one or many of these ProducerConsumers active at a time. They behave the same regardless.
public class ProducerConsumer
{
private Task _backgroundWorker;
private CancellationTokenSource _cancellationTokenSource;
private BlockingCollection<Data> _dataQueue;
public ProducerConsumer()
{
_dataQueue = new BlockingCollection<Data>();
_cancellationTokenSource = new CancellationTokenSource();
_backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
_backgroundWorker.Start();
}
public void AddData(Data data)
{
_dataQueue.Add(data);
System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
}
private void DoWork(CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
try
{
_dataQueue.Take(cancellationToken); //This is blocking forever
//DoWork
}
catch(OperationCanceledException) { }
catch(Exception e)
{
System.Diagnostics.Debug.WriteLine(e.ToString());
throw;
}
}
}
}
When running this the print statement is incrementing so we definitely have data in the Collection, but for whatever reason Take() continues to block.
It isn't throwing an Exception either.
Cancellation is requested with Dispose(), but I didn't add that here. It isn't called being called early.
I have tried using .GetConsumingEnumerable() and that also blocks forever.
Am I starting the Task wrong? Could I be running out of threads?
I've considered using a BackgroundWorker instead of a Task, but according to MSFT Task is preferred.
Thanks in advance.
First of all, I wouldn't try to create my own producer/consumer implementation, especially not one that blocks. The simple producer/consumer scenario can be handled easily with an ActionBlock. The ActionBlock has an internal queue to which multiple concurrent producers can post messages. The ActionbBlock will process the queued messages in the background using the worker method passed to its constructor :
class SerialWorker
{
ActionBlock<Data> _serialBlock;
public SerialWorker()
{
_serialBlock=new ActionBlock<Data>(data=>DoWork(data));
}
//The worker action can be synchronous
private void DoWork(Data data)
{
}
//or asynchronous
private async Task DoWorkAsync(Data data)
{
}
//Producer Code
//While the application runs :
public void PostData(Data data)
{
_serialBlock.Post(someData);
}
//When the application finishes
//Tell the block to shut down and wait for it to process any leftover requests
public async Task Shutdown()
{
_serialBlock.Complete();
await _serialBlock.Completion;
}
The worker method can be async, eg new ActionBlock<Data>(data=>DoWorkAsync(data))
will work just fine. This allows the use of asynchronous methods without blocking inside the worker itself.
New messages are posted with ActionBlock.Post
. When it's time to shut down, the application should call Complete()
to notify the actionblock and await for it to complete. The ActionBlock will stop receiving more messages and process anything still left in its buffer before terminating.