Search code examples
c#concurrencyautoresetevent

C# concurrent: Is it a good idea to use many AutoResetEvent?


Suppose there are many threads calling Do(), and only one worker thread handles the actual job.

void Do(Job job)
{
    concurrentQueue.Enqueue(job);
    // wait for job done
}

void workerThread()
{
    while (true)
    {
        Job job;
        if (concurrentQueue.TryDequeue(out job))
        {
            // do job
        }
    }
}

The Do() should wait until the job done before return. So I wrote the following code:

class Task 
{
    public Job job;
    public AutoResetEvent ev;
}

void Do(Job job)
{
    using (var ev = new AutoResetEvent(false))
    {
        concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
        ev.WaitOne();
    }
}

void workerThread()
{
    while (true)
    {
        Task task;
        if (concurrentQueue.TryDequeue(out task))
        {
            // do job
            task.ev.Set();
        }
    }
}

After some tests I found it works as expected. However I'm not sure is it a good way to allocate many AutoResetEvents, or is there a better way to accomplish?


Solution

  • Since all clients must wait a single thread to do the job, there is no real need for using a queue. So I suggest to use the Monitor class instead, and specifically the Wait/Pulse functionality. It is a bit low level and verbose though.

    class Worker<TResult> : IDisposable
    {
        private readonly object _outerLock = new object();
        private readonly object _innerLock = new object();
        private Func<TResult> _currentJob;
        private TResult _currentResult;
        private Exception _currentException;
        private bool _disposed;
    
        public Worker()
        {
            var thread = new Thread(MainLoop);
            thread.IsBackground = true;
            thread.Start();
        }
    
        private void MainLoop()
        {
            lock (_innerLock)
            {
                while (true)
                {
                    Monitor.Wait(_innerLock); // Wait for client requests
                    if (_disposed) break;
                    try
                    {
                        _currentResult = _currentJob.Invoke();
                        _currentException = null;
                    }
                    catch (Exception ex)
                    {
                        _currentException = ex;
                        _currentResult = default;
                    }
                    Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
                }
            } // We are done
        }
    
        public TResult DoWork(Func<TResult> job)
        {
            TResult result;
            Exception exception;
            lock (_outerLock) // Accept only one client at a time
            {
                lock (_innerLock) // Acquire inner lock
                {
                    if (_disposed) throw new InvalidOperationException();
                    _currentJob = job;
                    Monitor.Pulse(_innerLock); // Notify worker thread about the new job
                    Monitor.Wait(_innerLock); // Wait for worker thread to process the job
                    result = _currentResult;
                    exception = _currentException;
                    // Clean up
                    _currentJob = null;
                    _currentResult = default;
                    _currentException = null;
                }
            }
            // Throw the exception, if occurred, preserving the stack trace
            if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
            return result;
        }
    
        public void Dispose()
        {
            lock (_outerLock)
            {
                lock (_innerLock)
                {
                    _disposed = true;
                    Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
                }
            }
        }
    }
    

    Usage example:

    var worker = new Worker<int>();
    int result = worker.DoWork(() => 1); // Accepts a function as argument
    Console.WriteLine($"Result: {result}");
    worker.Dispose();
    

    Output:

    Result: 1
    

    Update: The previous solution is not await-friendly, so here is one that allows proper awaiting. It uses a TaskCompletionSource for each job, stored in a BlockingCollection.

    class Worker<TResult> : IDisposable
    {
        private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
            = new BlockingCollection<TaskCompletionSource<TResult>>();
    
        public Worker()
        {
            var thread = new Thread(MainLoop);
            thread.IsBackground = true;
            thread.Start();
        }
    
        private void MainLoop()
        {
            foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
            {
                var job = (Func<TResult>)tcs.Task.AsyncState;
                try
                {
                    var result = job.Invoke();
                    tcs.SetResult(result);
                }
                catch (Exception ex)
                {
                    tcs.TrySetException(ex);
                }
            }
        }
    
        public Task<TResult> DoWorkAsync(Func<TResult> job)
        {
            var tcs = new TaskCompletionSource<TResult>(job,
                TaskCreationOptions.RunContinuationsAsynchronously);
            _blockingCollection.Add(tcs);
            return tcs.Task;
        }
    
        public TResult DoWork(Func<TResult> job) // Synchronous call
        {
            var task = DoWorkAsync(job);
            try { task.Wait(); } catch { } // Swallow the AggregateException
            // Throw the original exception, if occurred, preserving the stack trace
            if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
            return task.Result;
        }
    
        public void Dispose()
        {
            _blockingCollection.CompleteAdding();
        }
    }
    

    Usage example

    var worker = new Worker<int>();
    int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
    Console.WriteLine($"Result: {result}");
    worker.Dispose();
    

    Output:

    Result: 1