I expected the following Connected
and Disconnected
handlers are called alternately.
But it wasn't. It is not 100% reproducible but sometimes the test failed. Repository is here.
I attach simplified code here because full code will be long. Please refer to the repository for the reproducible example.
public class ClientStressTest3 {
[Fact]
public async Task TestAsync() {
var client = new Client();
int openCloseDifference = 0;
var failures = Channel.CreateUnbounded<string>();
client.Connected += () => {
Interlocked.Increment(ref openCloseDifference);
int difference = openCloseDifference;
Debug.WriteLine("Connected: {}", difference);
if (Math.Abs(difference) > 1) {
// Failure point. Why enter here?
_ = failures.Writer.WriteAsync($"open close difference {difference}");
}
};
client.Disconnected += () => {
Interlocked.Decrement(ref openCloseDifference);
int difference = openCloseDifference;
Debug.WriteLine("Disconnected: {}", difference);
if (Math.Abs(difference) > 1) {
_ = failures.Writer.WriteAsync($"open close difference {difference}");
}
};
var tasks = new List<Task>();
for (int i = 0; i < 625; i++) {
tasks.Add(Task.Run(async () => {
try {
await client.ConnectAsync().ConfigureAwait(false);
}
catch (Exception ex) { }
}));
tasks.Add(Task.Run(async () => {
try {
await client.CloseAsync().ConfigureAwait(false);
}
catch (Exception ex) { }
}));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
while (await failures.Reader.WaitToReadAsync().ConfigureAwait(false)) {
string failure = await failures.Reader.ReadAsync().ConfigureAwait(false);
throw new Exception(failure);
}
}
}
class Client {
public event Action Connected = delegate { };
public event Action Disconnected = delegate { };
private readonly SemaphoreSlim _connectSemaphore = new(1);
private Task _dispatch = Task.CompletedTask;
private WebSocket _clientSocket;
public async Task ConnectAsync() {
await _connectSemaphore.WaitAsync().ConfigureAwait(false);
// _dispatch may be same among multiple threads if restored at the same time.
try {
await _clientSocket.ConnectAsync().ConfigureAwait(false);
if (_dispatch != null) {
_dispatch = _dispatch.ContinueWith((_) => DispatchEventAsync(), TaskScheduler.Default);
}
}
finally {
_connectSemaphore.Release();
}
}
private async Task DispatchEventAsync() {
try {
Connected();
}
catch (Exception exception) { }
try {
while (await events.WaitToReadAsync().ConfigureAwait(false)) {
DispatchEvent(await events.ReadAsync().ConfigureAwait(false));
}
}
catch (Exception exception) { }
try {
Disconnected();
}
catch (Exception ex) { }
}
}
I think _connectSemaphore
will guard the inner code execution, and only one thread will execute DispatchEventAsync
.
But it seems sometimes two threads enter DispatchEventAsync
at the same time. I can't understand this.
ContinueWith
is a low-level method with dangerous default behavior (link is to my blog). In this case, your code is not behaving how you think it should because ContinueWith
(like StartNew
) doesn't understand asynchronous delegates.
Specifically, the task returned from ContinueWith
(the same task stored in _dispatch
) will complete when DispatchEventAsync
asynchronously yields (i.e., hits its first await
that asynchronously waits). This is likely the call to WaitToReadAsync
, which is after Connected
and before Disconnected
. So the _dispatch
task completes after Connected
and before Disconnected
.
Ideally, you should avoid ContinueWith
completely. Sometimes a local asynchronous method helps, e.g.:
public async Task ConnectAsync() {
await _connectSemaphore.WaitAsync().ConfigureAwait(false);
try {
await _clientSocket.ConnectAsync().ConfigureAwait(false);
if (_dispatch != null) {
_dispatch = ChainAsync(_dispatch);
}
}
finally {
_connectSemaphore.Release();
}
static async Task ChainAsync(Task dispatch)
{
await dispatch;
await DispatchEventAsync();
}
}
If you do want to continue using ContinueWith
for some reason, then you can use Unwrap
:
public async Task ConnectAsync() {
await _connectSemaphore.WaitAsync().ConfigureAwait(false);
try {
await _clientSocket.ConnectAsync().ConfigureAwait(false);
if (_dispatch != null) {
_dispatch = _dispatch.ContinueWith(_ => DispatchEventAsync(), TaskScheduler.Default)
.Unwrap();
}
}
finally {
_connectSemaphore.Release();
}
}
Using either of these approaches, the _dispatch
task will now complete at the end of DispatchEventAsync
.