Hi I am a student intern with little to no C# experience who got put into a situation to take over a windows service that uses TaskCompletionSource
and BlockingCollection
to implement multithreading. I have never done C#. I am trying to optimize how the service handles its task which is to crunch log files.
My question is, using BlockingCollection
to create a thread queue which executes WorkiItem
, how can you get the count of active threads in the queue? Meaning how many items that have been invoked by the EnqueueTask()
command are still in running state? I don't want the count of the queue backlog which is what _taskQ.Count
returns. I want count of active threads. I want to keep the thread count at four and only Enqueue an Item once a previous item is done. I don't want a blacklog of items in my queue.
public class ProducerConsumerQueue
{
public CancellationTokenSource Token { get; set; }
private BlockingCollection<WorkItem> _taskQ;
public ProducerConsumerQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>();
for(int i = 0; i <workerCount; i++)
{
Task.Factory.StartNew(Consume);
}
}
public Task EnqueueTask(Action action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, action, cancelToken));
return tcs.Task;
}
public void Consume()
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
{
try
{
workItem.Action();
workItem.TaskSource.SetResult(null);
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken == workItem.CancelToken)
{
workItem.TaskSource.SetCanceled();
}
else
{
workItem.TaskSource.SetException(ex);
}
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
}
This ProducerCOnsumer
queue is called at service start and its queue is reloaded at each service polling interval. I want to limit how many of these threads are spawned by setting it to the limit of files that are in a thread safe db table. So if thread count is 4, the number of files in the db table will be 4. The queue shouldn't spawn additional threads or enqueue files into it's queue until 1 file is done. To do this, I figured a simple solution would be to count the number of active threads (which would mean active amount of files crunching and don't add any new files until the thread goes down by 1:
protected override void OnStart(string[] args)
{
ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT);
InitializeLogging();
PollOnServiceStart();
_timer.Elapsed += OnElapsedTime;
_timer.Enabled = true;
_timer.Interval = _interval;
}
public void OnElapsedTime(object source, ElapsedEventArgs args)
{
try
{
//InitializeLogging();
Poll();
}
catch (Exception ex)
{
Logger.Error(ex.Message.ToString());
}
}
public void PollOnServiceStart()
{
foreach (var handler in handlers)
{
ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token);
}
}
If you want a thread-safe counter to check the number of active tasks you can use an int _counter
field and Interlocked.Increment(ref _counter)
/Interlocked.Decrement(ref _counter)
.
Remember to increment as the first line in a try
block decrement in a finally
block so that you don't lose any calls to either if an exception is raised.
https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0
https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0