I'm facing a challenge in ensuring that a WebSocket message is sent before the task that owns that WebSocket is completed.
I have a Main method, where the core logic resides, including starting a WebSocket connection and streaming from an IP camera. The task is created and stored in a dictionary with a CancellationTokenSource to allow for task cancellation. The cancellation token is also passed to the Main method.
public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
// ... Initialize WebSocket and other operations
while (!token.IsCancellationRequested)
{
// ... Fetch and display video frames
}
if (token.IsCancellationRequested)
{
var Stop_Server = new
{
command = "Stop_Stream"
};
await SendSocket(cameraIndex, Stop_Server, token);
}
}
private async Task IPCameraMethod()
{
// ... some other logic
if (tokenSources.TryGetValue(cameraIndex, out var tuple))
{
tuple.Item2.Cancel();
try
{
await tuple.Item3; // Await the task's completion
}
catch (OperationCanceledException)
{
// Task was cancelled, expected behavior
}
}
// Now that the old task has been cancelled and awaited, start the new task.
var newCts = new CancellationTokenSource();
var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);
Debug.WriteLine("New Task Created!");
tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
}
As soon as the line await tuple.Item3; is executed, the WebSocket in the Main method is getting disconnected, even before the SendSocket function is called to send the "Stop_Stream" message.
How can I ensure that the WebSocket message is sent before the WebSocket is closed due to task completion?
Essentially the IPCameraMethod
should wait for the Main
task to complete before closing it and hence closing the websocket connection, but it seems that it closes immediately.
These are the output im getting:
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
ReceiveSocket operation was cancelled.
Exception thrown: 'System.ArgumentNullException' in OpenCvSharp.dll
Exception caught while processing: Value cannot be null.
Parameter name: buf
Details: System.ArgumentNullException: Value cannot be null.
Parameter name: buf
at OpenCvSharp.Cv2.ImDecode(Byte[] buf, ImreadModes flags)
at Iris.Services.IPCameraService.<RunYolo>d__74.MoveNext() in C:\Users\nico_\Desktop\Projects\Iris\Iris WPF\Iris Y\Iris-Latest\Iris\Services\IPCameraService.cs:line 317
private async Task IPCameraMethod()
{
if (string.IsNullOrWhiteSpace(textBoxUrl.Text) ||
!int.TryParse(SourceComboBox.Name.Replace("comboBox", ""), out int cameraIndex) ||
comboBoxSavedCameras.SelectedIndex < 0)
{
return;
}
string url = textBoxUrl.Text;
int selectedItemIndex = comboBoxSavedCameras.SelectedIndex;
if (tokenSources.TryGetValue(cameraIndex, out var tuple))
{
if (selectedItemIndex != tuple.Item1)
{
tuple.Item2.Cancel();
try
{
await tuple.Item3; // Await the task's completion
}
catch (OperationCanceledException)
{
// Task was cancelled, expected behavior
}
tuple.Item2.Dispose();
tokenSources.Remove(cameraIndex);
}
else
{
return; // If selected item is the same, we do not need to create a new task
}
}
// Now that the old task has been cancelled and awaited, start the new task.
var newCts = new CancellationTokenSource();
var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);
Debug.WriteLine("New Task Created!");
tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
}
public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
CameraURLs[cameraIndex] = cameraUrl;
await StartSocket(cameraIndex, token);
await StartStream(cameraIndex, cameraUrl, token);
var stopStreamCompletion = new TaskCompletionSource<bool>();
EventHandler(cameraUrl, cameraIndex, token);
while (!token.IsCancellationRequested) // Added a token check to allow stopping the loop externally
{
var frame = await RunYolo(cameraIndex, token);
if (frame != null)
await UpdateDisplay(frame, imageControl, token);
}
// Close WebSocket when CancellationToken is triggered
if (token.IsCancellationRequested)
{
var Stop_Server = new
{
command = "Stop_Stream"
};
await SendSocket(cameraIndex, Stop_Server, token);
Debug.WriteLine("Websocket Cleared!");
}
}
public async Task StartStream(int cameraIndex, string cameraUrl, CancellationToken token)
{
try
{
// Check for cancellation before sending the request
token.ThrowIfCancellationRequested();
var settings = new
{
command = "Start_Stream",
camera_url = cameraUrl
};
await SendSocket(cameraIndex, settings, token); // Pass the cancellation token to SendSocket
}
catch (OperationCanceledException)
{
// Handle cancellation
Debug.WriteLine("StartStream was cancelled.");
// If needed, you can add additional logic here to clean up or log.
return;
}
catch (System.Net.WebSockets.WebSocketException)
{
// Existing logic
Console.WriteLine("WebSocketException occurred while starting Stream. Retrying...");
await Task.Delay(1000); // Wait for a second before retrying
}
}
private async Task<Mat> RunYolo(int cameraIndex, CancellationToken token)
{
try
{
// Check for cancellation before doing any work
token.ThrowIfCancellationRequested();
// Run your operations
byte[] imageBytes = await ReceiveSocket(cameraIndex, token);
// Convert bytes to Mat
Mat frame = Cv2.ImDecode(imageBytes, ImreadModes.Color);
return frame;
}
catch (OperationCanceledException)
{
// Handle cancellation here, logging or other side-effects if needed
Debug.WriteLine("RunYolo operation was cancelled.");
return null;
}
catch (Exception ex)
{
// Log or handle other types of exceptions as needed
Console.WriteLine($"Exception caught while processing: {ex.Message}");
Console.WriteLine($"Details: {ex}");
return null;
}
}
public async Task<byte[]> ReceiveSocket(int cameraIndex, CancellationToken token)
{
try
{
// Check for cancellation before receiving
token.ThrowIfCancellationRequested();
var ws = CameraWebsockets[cameraIndex];
List<byte> fullMessage = new List<byte>();
WebSocketReceiveResult result;
do
{
var messageReceived = new ArraySegment<byte>(new byte[4096]);
result = await ws.ReceiveAsync(messageReceived, token);
fullMessage.AddRange(messageReceived.Skip(messageReceived.Offset).Take(result.Count));
} while (!result.EndOfMessage); // Continue reading until you get the entire message.
return fullMessage.ToArray();
}
catch (OperationCanceledException)
{
// Handle cancellation by logging or other side-effects if needed
Debug.WriteLine("ReceiveSocket operation was cancelled.");
return null;
}
catch (System.Net.WebSockets.WebSocketException)
{
await StartSocket(cameraIndex, token);
await StartStream(cameraIndex, CameraURLs[cameraIndex],token);
return await ReceiveSocket(cameraIndex,token);
}
}
public async Task SendSocket(int cameraIndex, object message, CancellationToken token)
{
try
{
token.ThrowIfCancellationRequested();
var ws = CameraWebsockets[cameraIndex];
var Start_ServerJson = JsonConvert.SerializeObject(message);
var Start_ServerBytes = Encoding.UTF8.GetBytes(Start_ServerJson);
await ws.SendAsync(new ArraySegment<byte>(Start_ServerBytes, 0, Start_ServerBytes.Length), WebSocketMessageType.Text, true, token);
}
catch (OperationCanceledException)
{
// Handle cancellation here, perhaps log it if needed
Debug.WriteLine("SendSocket operation was cancelled.");
}
catch (System.Net.WebSockets.WebSocketException)
{
// When a WebSocketException occurs, you restart the socket and stream.
// Be sure to consider the cancellation token when you're restarting as well.
await StartSocket(cameraIndex, token); // Consider passing token here too
await StartStream(cameraIndex, CameraURLs[cameraIndex], token); // And here
await SendSocket(cameraIndex, message, token); // Recursion with the same token
}
}
public async Task StartSocket(int cameraIndex, CancellationToken token)
{
while (true)
{
try
{
// Check for cancellation before trying to connect
token.ThrowIfCancellationRequested();
var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri(wsUrl), token); // Pass the cancellation token here
if (CameraWebsockets.ContainsKey(cameraIndex))
{
CameraWebsockets[cameraIndex] = ws;
}
else
{
CameraWebsockets.TryAdd(cameraIndex, ws);
}
return; // Break the loop if connected successfully
}
catch (OperationCanceledException)
{
// Handle cancellation by logging or other side-effects if needed
Debug.WriteLine("StartSocket operation was cancelled.");
return; // Exit the loop if cancelled
}
catch (System.Net.WebSockets.WebSocketException)
{
Console.WriteLine("WebSocketException occurred while starting socket. Retrying...");
await Task.Delay(1000, token); // Pass the cancellation token here too
}
}
}
private async Task UpdateDisplay(Mat frame, System.Windows.Controls.Image imageControl, CancellationToken token)
{
// Check for cancellation before updating the UI
if (token.IsCancellationRequested)
{
Debug.WriteLine("UpdateDisplay operation was cancelled.");
return;
}
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
try
{
if (frame.Empty() || frame.Width == 0 || frame.Height == 0)
{
Console.WriteLine("Received an invalid frame.");
return; // Exit and don't process further
}
var bitmap = OpenCvSharp.Extensions.BitmapConverter.ToBitmap(frame);
var bitmapSource = ConvertToBitmap(bitmap);
imageControl.Source = bitmapSource;
}
catch (Exception ex)
{
Console.WriteLine($"Failed to convert frame to bitmap. Frame Size: {frame.Size()}. Channels: {frame.Channels()}. Error: {ex.Message}");
}
});
}
The issue you're facing is due to the fact that when you call await tuple.Item3 in your IPCameraMethod, effectively awaits the completion of the Main task. When the Main task is completed (either normally or due to cancellation), it can also lead to the WebSocket being closed immediately
public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl) { // ... Initialize WebSocket and other operations
// Create a TaskCompletionSource to signal when the "Stop_Stream" message is sent
var stopStreamCompletion = new TaskCompletionSource<bool>();
while (!token.IsCancellationRequested)
{
// ... Fetch and display video frames
}
if (token.IsCancellationRequested)
{
var Stop_Server = new
{
command = "Stop_Stream"
};
// Send the "Stop_Stream" message and await its completion
await SendSocket(cameraIndex, Stop_Server, token);
// Signal that the message has been sent
stopStreamCompletion.SetResult(true);
}
// Now, await the completion of the Main task
await stopStreamCompletion.Task;
}