I build an extension method for WaitHandles (especially ManualResetEventSlim) to make them usable in async code. Usage looks like this:
public class WaitHandleExampleClass : IDisposable
{
private readonly ManualResetEventSlim _mre;
public WaitHandleExampleClass()
{
_mre = new ManualResetEventSlim(false);
}
public async Task WaitForHandle()
{
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
var cancelled = await _mre.WaitHandle.WaitForSignalOrCancelAsync(cts.Token)
.ConfigureAwait(false);
if (cancelled)
{
DoCancelAction();
return;
}
Proceed();
// later, _mre.Reset() is called
}
}
public void TriggerWaitHandleFromOtherThread()
{
_mre.Set();
}
public void Dispose()
{
_mre.Dispose();
}
}
My Extension method WaitForSignalOrCancelAsync() looks like this:
public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
{
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
var tcs = new TaskCompletionSource<bool>();
var task = tcs.Task;
var container = new CleanupContainer();
// dont pass ct to ContinueWith - otherwise rwh is not unregistered (even with TaskContinuationOptions.None)
task.ContinueWith(TaskContinueWith, container, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
container.Ctr = ct.Register(SetCancel, tcs, false);
container.Rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, RwhCallback, tcs, -1, true);
return task;
}
private static void SetCancel(object state) => ((TaskCompletionSource<bool>)state).TrySetResult(true);
private static void RwhCallback(object state, bool o) => ((TaskCompletionSource<bool>)state).TrySetResult(false);
private static void TaskContinueWith(Task t, object state)
{
try
{
((CleanupContainer)state).Cleanup();
}
catch (Exception e)
{
DebugAssert.ShouldNotBeCalled();
}
}
private class CleanupContainer
{
public RegisteredWaitHandle Rwh { get; set; }
public CancellationTokenRegistration Ctr { get; set; }
public void Cleanup()
{
// so this just waits until the condition is true (returns true) or timeout is reached (returns false)
var conditionOk = SpinWait.WaitFor(() => Rwh != null && Ctr != default, timeoutMs: 200);
DebugAssert.MustBeTrue(conditionOk); // always true in debug -> Rwh and Ctr are set
var rwhUnregisterOkay = Rwh?.Unregister(null);
DebugAssert.MustBeTrue(rwhUnregisterOkay == true); // also always true
Ctr.Dispose();
Rwh = null;
Ctr = default;
}
}
Atfer deployment and running this code as Windows service for multiple hours, i can see that memory usage increases. Also CPU usages increases but i am not sure if this is related. So i attached dotmemory. Over time, i get more and more objects of:
It does not help if i force GC with dotmemory
I guess this is related to the extension method above, but i dont get why. I took this MSDN code as base ("From Wait Handles to TAP"): https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/interop-with-other-asynchronous-patterns-and-types#see-also
I also wrote my first unit test with dotmemory integration, not sure if its correct or helpful. But with that i know that Rwh.Unregister() makes a different as the object count different if its commented out in the Cleanup method. Its red because registeredWaitHandles.ObjectsCount == 1, not 0
[TestMethod]
[DotMemoryUnit(FailIfRunWithoutSupport = false)]
public async Task WaitForSignalOrCancelAsync_ResourcesAreReleased()
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var cancellationTokenSource = new CancellationTokenSource();
var task = waitHandle.WaitHandle.WaitForSignalOrCancelAsync(cancellationTokenSource.Token);
cancellationTokenSource.Cancel();
await task;
for (var i = 0; i < 10; i++)
{
GC.Collect();
GC.WaitForPendingFinalizers();
}
Thread.Sleep(200);
for (var i = 0; i < 10; i++)
{
GC.Collect();
GC.WaitForPendingFinalizers();
}
dotMemory.Check(memory =>
{
var registeredWaitHandles = memory.GetObjects(where => where.Type.Is<RegisteredWaitHandle>());
// if i comment out the Rwh.Unregister() in CleanupContainer.Cleanup() i get 2 as count here
// with Rwh.Unregister(), i get 1 here
registeredWaitHandles.ObjectsCount.MustBeEqualTo(0, "rwh > 1");
var cancellationTokenRegistrations = memory.GetObjects(where => where.Type.Is<CancellationTokenRegistration>());
cancellationTokenRegistrations.ObjectsCount.MustBeEqualTo(0, "ctr > 0");
});
}
}
Edit: I tried the solution from Ivan (stackoverflow link) but i run into the same problem. My unit test is still red with 2 remaining references I briefly checked it on the server, after 20 minutes i see increasing new objects of the 3 types (TaskCompletionSource, ...)
I quickly tried AsyncEx, but with that my unit test is also red with that
Code i tried:
public static async Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
try
{
await waitHandle.WaitOneAsync(cancellationToken, timeoutMilliseconds).ConfigureAwait(false);
return false; // no cancel
}
catch (OperationCanceledException)
{
return true; // cancel
}
}
private static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
(_, timedOut) =>
{
if (timedOut)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(true);
}
},
null, timeout, true);
Task<bool> task = tcs.Task;
_ = task.ContinueWith(_ =>
{
var ok = rwh.Unregister(null);
var ok2 = rwh.Unregister(waitHandle);
ctr.Dispose();
}, CancellationToken.None);
return task;
}
Try a bit different approach - do not rely on task.ContinueWith.
Also SpinWait.WaitFor
in your code is a bit odd, the need for it shows that you have race condition issues.
Try the following code:
public static class WaitHandleExtensions
{
public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(waitHandle);
if (ct.IsCancellationRequested)
return Task.FromResult(true);
if (waitHandle.WaitOne(0))
return Task.FromResult(false);
var op = new WaitForSignalOrCancelOperation(waitHandle, ct);
return op.Task;
}
private sealed class WaitForSignalOrCancelOperation
{
private const int State_Awaiting = 0, State_Completed = 1;
private static readonly WaitOrTimerCallback _onWaitCompleteCallback;
private static readonly Action<object?> _onCancelCallback;
private readonly TaskCompletionSource<bool> _tcs = new();
private volatile int _state = State_Awaiting;
private CancellationTokenRegistration _ctr;
private RegisteredWaitHandle? _rwh;
public Task<bool> Task
{
get => _tcs.Task;
}
static WaitForSignalOrCancelOperation()
{
_onWaitCompleteCallback = static (object? state, bool timedOut) =>
{
var self = (WaitForSignalOrCancelOperation)state!;
self.HandleWaitComplete();
};
_onCancelCallback = static (object? state) =>
{
var self = (WaitForSignalOrCancelOperation)state!;
self.HandleCanceled();
};
}
public WaitForSignalOrCancelOperation(WaitHandle waitHandle, CancellationToken ct)
{
if (ct.CanBeCanceled)
{
_ctr = ct.Register(_onCancelCallback, this, useSynchronizationContext: false);
if (_state != State_Awaiting)
{
// CancellationToken is already canceled.
// _onCancelCallback has been called already, the result is already set.
UnregisterAll();
return;
}
}
_rwh = ThreadPool.RegisterWaitForSingleObject(
waitHandle,
_onWaitCompleteCallback,
state: this,
Timeout.Infinite,
executeOnlyOnce: true);
if (_state != State_Awaiting && _rwh != null)
{
// waitHandle is already signaled.
// _onWaitCompleteCallback has been called already, the result is already set.
UnregisterAll();
}
}
private void HandleWaitComplete()
{
// Check if already called and mark as called if not.
if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
{
UnregisterAll();
_tcs.TrySetResult(false);
}
}
private void HandleCanceled()
{
// Check if already called and mark as called if not.
if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
{
UnregisterAll();
_tcs.TrySetResult(true);
}
}
private void UnregisterAll()
{
// Unregister and clear CancellationTokenRegistration
try
{
_ctr.Unregister();
_ctr = default;
}
catch (ObjectDisposedException)
{
// Ignore.
}
catch (Exception)
{
// Ignore or log, whatever...
}
// Unregister and clear RegisteredWaitHandle
if (_rwh != null)
{
try
{
_rwh.Unregister(null);
_rwh = null;
}
catch (ObjectDisposedException)
{
// Ignore.
}
catch (Exception)
{
// Ignore or log, whatever...
}
}
}
}
}
Pay attention that testing framework may also use RegisteredWaitHandle
by itself, so registeredWaitHandles.ObjectsCount
may be grater than zero even if you don't use WaitForSignalOrCancelAsync
at all.