I have a situation where I need to have a large number (hundreds) of queues, where the items should be processed in order (need single threaded consumer). My first implementation, based on the samples, I used a single long-running Task per BlockingCollection to consume the queue items. However, I ended up having an applications with hundreds of threads mostly sitting idle doing nothing but consuming memory, since the queues are empty most of the time.
I thought it would be better to only have a consumer Task running only if there's something in the queue to process, however, I haven't been able to find samples that provide what the best practices should be.
I came up with a solution similar to the one below. But the problem is, every item results in a new Task (maybe this is inefficient? Waste of resources?). But if I don't create a new task for every item, I can't guarantee that an item won't be sitting in the queue unprocessed.
private object _processSyncObj = new object();
private volatile bool _isProcessing;
private BlockingCollection<string> _queue = new BlockingCollection<string>();
private void EnqueueItem(string item)
{
_queue.Add(item);
Task.Factory.StartNew(ProcessQueue);
}
private void ProcessQueue()
{
if (_isProcessing)
return;
lock (_processSyncObj)
{
string item;
while (_isProcessing = _queue.TryTake(out item))
{
// process item
}
}
}
What are the best practices/best solution for this situation with a guarantee that no situation exists where an item is in the queue, but no consumer is running?
I think that what you did is reasonable, because the Task was made to scale well also with million of tasks, producing internal sub-queues against the ThreadPool, avoiding too much context switching.
Behind the scenes, tasks are queued to the ThreadPool, which has been enhanced with algorithms that determine and adjust to the number of threads and that provide load balancing to maximize throughput. This makes tasks relatively lightweight, and you can create many of them to enable fine-grained parallelism.
Task Parallelism (Task Parallel Library)
...but what you did, will end up in just a normal Task programming, because for every enqueue you start a task, so the blocking collection is quite unused. As far as understood, your concern is about firing a task and let the TaskScheduler, run the jobs in order as they arrived.
Do you know you can also customize the TaskScheduler
?
What about just use a Task programming pattern, plus a custom TaskScheduler to control the flow of the scheduled task?
For example you can create an OrderedTaskScheduler, that derive from a LimitedConcurrencyLevelTaskScheduler that would behave like this...
The LimitedConcurrencyLevelTaskScheduler class offers a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool. It is necessary to set the maximum degree of parallelism desired for this scheduler.
The OrderedTaskScheduler class provides a task scheduler that ensures only one task is executing at a time. Tasks execute in the order that they were queued (FIFO). It is a subclass of LimitedConcurrencyLevelTaskScheduler that sends 1 as a parameter for its base class constructor.
You can find these scheduler already developed, they're called ParallelExtensionsExtras, and you can download it from here, and read some toughts about it from this blog post and others.
You can find it also directly on nuget and a code mirror on github.
Enjoy! :)