Suppose there are many threads calling Do()
, and only one worker thread handles the actual job.
void Do(Job job)
{
concurrentQueue.Enqueue(job);
// wait for job done
}
void workerThread()
{
while (true)
{
Job job;
if (concurrentQueue.TryDequeue(out job))
{
// do job
}
}
}
The Do() should wait until the job done before return. So I wrote the following code:
class Task
{
public Job job;
public AutoResetEvent ev;
}
void Do(Job job)
{
using (var ev = new AutoResetEvent(false))
{
concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
ev.WaitOne();
}
}
void workerThread()
{
while (true)
{
Task task;
if (concurrentQueue.TryDequeue(out task))
{
// do job
task.ev.Set();
}
}
}
After some tests I found it works as expected. However I'm not sure is it a good way to allocate many AutoResetEvents, or is there a better way to accomplish?
Since all clients must wait a single thread to do the job, there is no real need for using a queue. So I suggest to use the Monitor
class instead, and specifically the Wait/Pulse functionality. It is a bit low level and verbose though.
class Worker<TResult> : IDisposable
{
private readonly object _outerLock = new object();
private readonly object _innerLock = new object();
private Func<TResult> _currentJob;
private TResult _currentResult;
private Exception _currentException;
private bool _disposed;
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
lock (_innerLock)
{
while (true)
{
Monitor.Wait(_innerLock); // Wait for client requests
if (_disposed) break;
try
{
_currentResult = _currentJob.Invoke();
_currentException = null;
}
catch (Exception ex)
{
_currentException = ex;
_currentResult = default;
}
Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
}
} // We are done
}
public TResult DoWork(Func<TResult> job)
{
TResult result;
Exception exception;
lock (_outerLock) // Accept only one client at a time
{
lock (_innerLock) // Acquire inner lock
{
if (_disposed) throw new InvalidOperationException();
_currentJob = job;
Monitor.Pulse(_innerLock); // Notify worker thread about the new job
Monitor.Wait(_innerLock); // Wait for worker thread to process the job
result = _currentResult;
exception = _currentException;
// Clean up
_currentJob = null;
_currentResult = default;
_currentException = null;
}
}
// Throw the exception, if occurred, preserving the stack trace
if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
return result;
}
public void Dispose()
{
lock (_outerLock)
{
lock (_innerLock)
{
_disposed = true;
Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
}
}
}
}
Usage example:
var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
Output:
Result: 1
Update: The previous solution is not await-friendly, so here is one that allows proper awaiting. It uses a TaskCompletionSource
for each job, stored in a BlockingCollection
.
class Worker<TResult> : IDisposable
{
private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
= new BlockingCollection<TaskCompletionSource<TResult>>();
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
{
var job = (Func<TResult>)tcs.Task.AsyncState;
try
{
var result = job.Invoke();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
public Task<TResult> DoWorkAsync(Func<TResult> job)
{
var tcs = new TaskCompletionSource<TResult>(job,
TaskCreationOptions.RunContinuationsAsynchronously);
_blockingCollection.Add(tcs);
return tcs.Task;
}
public TResult DoWork(Func<TResult> job) // Synchronous call
{
var task = DoWorkAsync(job);
try { task.Wait(); } catch { } // Swallow the AggregateException
// Throw the original exception, if occurred, preserving the stack trace
if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
return task.Result;
}
public void Dispose()
{
_blockingCollection.CompleteAdding();
}
}
Usage example
var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
Output:
Result: 1