I'm developing a fairly basic "Worker" application to run on a server, picking up jobs from a SQL server, queuing them in a Blocking Collection and spawning a configured number of Tasks to consume jobs from the queue. What I'm finding is that in a release version of this application, my consumer tasks are stopping midway through their jobs without throwing any errors.
The code looks somewhat like this at the moment:
static void Main()
{
int NumberOfConcurrentJobs = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfConcurrentJobs"]);
int CollectionLimit = Convert.ToInt32(ConfigurationManager.AppSettings["MaxNumberOfQueueItems"]);
/* Blocking Collection for the jobs to be consumed */
BlockingCollection<Classes.Job> blockingCollection = new BlockingCollection<Classes.Job>(new ConcurrentQueue<Classes.Job>(), CollectionLimit);
/* Blocking Collection to hold IDs for each Consumer Task so that they can be identified */
BlockingCollection<int> ConsumerIDCollection = new BlockingCollection<int>(NumberOfConcurrentJobs);
/* Start the Producer task */
Task.Run(() =>
{
while (true)
{
/* There's logic in here to populate the blocking collection - working fine so omitted to keep snippet more concise */
Thread.Sleep(2000); // Artificial delay to not attempt to fill the blocking collection too frequently
}
}
/* Start the Consumer tasks */
for (int i = 0; i < NumberOfConcurrentJobs; i++)
{
ConsumerIDCollection.Add(i + 1);
/* Launch a task for each consumer */
Task.Run(() =>
{
int ConsumerID = ConsumerIDCollection.Take();
/* Loop forever, attempting to take a Job from the collection */
/* I'll introduce the ability to cancel off these tasks in the production code */
while (true)
{
if (blockingCollection.TryTake(out Classes.Job job))
{
try
{
Console.WriteLine(("(W) Consumer " + ConsumerID + ": Job " + job.JobID.ToString() + " taken...").PadRight(50) + "Processing.");
// Run the Job - executes "work" that could take seconds or hours.
// The work is a mixture of CPU and IO - processing data from variables in the job
// and creating files on a local drive.
job.RunWorker();
Console.WriteLine(("(W) Consumer " + ConsumerID + ": Job " + job.JobID.ToString() + " finished...").PadRight(50) + "Status " + job.Status + ".");
}
catch (Exception ex)
{
Common.WriteErrorLog(Common.LogType.Worker, "Consumer" + ConsumerID.ToString(), ex.Message);
}
}
Thread.Sleep(2000); // Wait before attempting to take on next job
}
}
}
Console.ReadKey();
}
The job.RunWorker() method is a normal non-async method with a void return - every action within it is synchronous. My objective with this code was to simply have, let's say 4, concurrent "Jobs" being worked on, each within a task. The test-case I'm running is to have 4 concurrent Tasks each consuming a job that creates 100 pdf files - each in their own directory.
This works without issue in local debug, albeit somewhat slowly. When I run a release version of this, the jobs are all being started successfully (and running much more quickly) but after a period of time, the tasks are seemingly stopping without throwing any exception errors. At the point of stopping, each job will have produced somewhere between 20-80 of the 100 files they were working on so I know each is behaving as expected for at least the beginning of the process.
Monitoring server resources, I can still see the CPU is being utilised heavily by the application but no more files are being built. I've tested leaving this in place for half an hour and the work never resumes so something is happening and I'm not understanding what it might be.
Some scenarios I've tested:
When looking into this issue, I've seen a lot of discussion about using "await" or Task.WaitAll() or Task.WhenAll() to ensure exceptions can be caught. I've tested out adding all the consumer tasks to an array and adding a Task.WaitAll() at the end of the main method instead of the Console.ReadKey() but the app behaviour doesn't change so despite perhaps that being better practice, I don't think my issue lies there.
I'm a little stumped on what's happening, I've no idea whether exceptions are being thrown that I'm not seeing or this is some operating system behaviour that's holding the threads up or something else entirely. I'm hoping someone might be able to point me in a direction to work out what I'm doing wrong here.
Thanks in advance.
I've implemented a rough ActionBlock structure as per the below in my Main.
ActionBlock<Job> workerBlock = new ActionBlock<Job>(async (job) =>
{
Console.WriteLine(job.JobID.ToString() + " started...");
try
{
await job.RunWorker();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Common.WriteErrorLog(Common.LogType.Worker, job.JobID.ToString(), ex.Message);
}
finally
{
Console.WriteLine(job.JobID.ToString() + " done...");
}
},
// Specify the number of parralel jobs and backlog queue size
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = NumberOfConcurrentJobs,
BoundedCapacity = QueueLimit
});
OnErrorLog(workerBlock);
// Start the Producer loop
while (true)
{
if (workerBlock.InputCount < QueueLimit)
{
List<int> JobIDs = ApiAction.GetJobsForWorker(QueueLimit);
foreach (int JobID in JobIDs)
{
workerBlock.SendAsync(new Job(JobID)).Wait();
}
workerBlock.Complete();
}
}
public static async void OnErrorLog(IDataflowBlock block)
{
try
{
await block.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine($"{block.GetType().Name} failed", ex);
}
}
// Trimmed down example of the Job class and RunWorker() method
class Job
{
public async Task RunWorker()
{
Task t = Task.Run(() =>
{
// file creation work
}
await t;
}
}
Some observations:
You are launching your tasks (producer and consumers) as fire-and-forget tasks. This means that in case of failure, you'll not be informed about it. Specifically if the Common.WriteErrorLog
fails, the exception will be captured in the Exception
property of the task, and some time in the future (non-deterministically) it will trigger the TaskScheduler.UnobservedTaskException
event, to which I assume you have not subscribed.
You are consuming the blockingCollection
inefficiently with the TryTake
and a Thread.Sleep(2000);
, instead of the preferable GetConsumingEnumerable
method.
Your code looks like an attempt to reinvent the ActionBlock<T>
component of the TPL Dataflow library (which is embedded in the .NET Core/.NET 5+ platforms). This component is not perfect, but it's way better than what you've come up so far. Even using as consumer a Parallel.ForEach
loop, passing as source
the Partitioner.Create(blockingCollection.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering)
, would probably be preferable.