Search code examples
c#wpfwebsocketasync-awaitcancellation-token

Unable to Send WebSocket Message Before Task Completion in C# Async Method


I'm facing a challenge in ensuring that a WebSocket message is sent before the task that owns that WebSocket is completed.

I have a Main method, where the core logic resides, including starting a WebSocket connection and streaming from an IP camera. The task is created and stored in a dictionary with a CancellationTokenSource to allow for task cancellation. The cancellation token is also passed to the Main method.

Here's a simplified version of my Main method:

public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
    // ... Initialize WebSocket and other operations
    while (!token.IsCancellationRequested)
    {
        // ... Fetch and display video frames
    }
    if (token.IsCancellationRequested)
    {
        var Stop_Server = new
        {
            command = "Stop_Stream"
        };
        await SendSocket(cameraIndex, Stop_Server, token);
    }
}

The problem arises when I try to cancel this task using the following method:

private async Task IPCameraMethod()
    {
        // ... some other logic
        if (tokenSources.TryGetValue(cameraIndex, out var tuple))
        {
            tuple.Item2.Cancel();
            try
            {
                await tuple.Item3;  // Await the task's completion
            }
            catch (OperationCanceledException)
            {
                // Task was cancelled, expected behavior
            }
        }
        // Now that the old task has been cancelled and awaited, start the new task.
        var newCts = new CancellationTokenSource();
        var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);

        Debug.WriteLine("New Task Created!");
        tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
    }

As soon as the line await tuple.Item3; is executed, the WebSocket in the Main method is getting disconnected, even before the SendSocket function is called to send the "Stop_Stream" message.

How can I ensure that the WebSocket message is sent before the WebSocket is closed due to task completion?

Essentially the IPCameraMethod should wait for the Main task to complete before closing it and hence closing the websocket connection, but it seems that it closes immediately.

These are the output im getting:

Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
ReceiveSocket operation was cancelled.
Exception thrown: 'System.ArgumentNullException' in OpenCvSharp.dll
Exception caught while processing: Value cannot be null.
Parameter name: buf
Details: System.ArgumentNullException: Value cannot be null.
Parameter name: buf
   at OpenCvSharp.Cv2.ImDecode(Byte[] buf, ImreadModes flags)
   at Iris.Services.IPCameraService.<RunYolo>d__74.MoveNext() in C:\Users\nico_\Desktop\Projects\Iris\Iris WPF\Iris Y\Iris-Latest\Iris\Services\IPCameraService.cs:line 317

This is the full code for IPCameraMethod:

private async Task IPCameraMethod()
        {
            if (string.IsNullOrWhiteSpace(textBoxUrl.Text) ||
                !int.TryParse(SourceComboBox.Name.Replace("comboBox", ""), out int cameraIndex) ||
                comboBoxSavedCameras.SelectedIndex < 0)
            {
                return;
            }

            string url = textBoxUrl.Text;
            int selectedItemIndex = comboBoxSavedCameras.SelectedIndex;

            if (tokenSources.TryGetValue(cameraIndex, out var tuple))
            {
                if (selectedItemIndex != tuple.Item1)
                {
                    tuple.Item2.Cancel();
                    
                    try
                    {
                        await tuple.Item3; // Await the task's completion

                    }

                    catch (OperationCanceledException)
                    {
                        // Task was cancelled, expected behavior
                    }
                    tuple.Item2.Dispose();
                    tokenSources.Remove(cameraIndex);
                    
                }
                else
                {
                    return; // If selected item is the same, we do not need to create a new task
                }
            }

            // Now that the old task has been cancelled and awaited, start the new task.
            var newCts = new CancellationTokenSource();
            var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);

            Debug.WriteLine("New Task Created!");
            tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
        }

This is the full code for the Main its calling and other methods:

public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
        {

            CameraURLs[cameraIndex] = cameraUrl;

            await StartSocket(cameraIndex, token);
            await StartStream(cameraIndex, cameraUrl, token);
            var stopStreamCompletion = new TaskCompletionSource<bool>();
            EventHandler(cameraUrl, cameraIndex, token);
            while (!token.IsCancellationRequested) // Added a token check to allow stopping the loop externally
            {
                var frame = await RunYolo(cameraIndex, token);
                if (frame != null)
                    await UpdateDisplay(frame, imageControl, token);
            }
            // Close WebSocket when CancellationToken is triggered
            if (token.IsCancellationRequested)
            {
                var Stop_Server = new
                {
                    command = "Stop_Stream"
                };
                await SendSocket(cameraIndex, Stop_Server, token);
                Debug.WriteLine("Websocket Cleared!");
            }
        }




        public async Task StartStream(int cameraIndex, string cameraUrl, CancellationToken token)
        {
            try
            {
                // Check for cancellation before sending the request
                token.ThrowIfCancellationRequested();

                var settings = new
                {
                    command = "Start_Stream",
                    camera_url = cameraUrl
                };

                await SendSocket(cameraIndex, settings, token); // Pass the cancellation token to SendSocket
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation
                Debug.WriteLine("StartStream was cancelled.");
                // If needed, you can add additional logic here to clean up or log.
                return;
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                // Existing logic
                Console.WriteLine("WebSocketException occurred while starting Stream. Retrying...");
                await Task.Delay(1000); // Wait for a second before retrying
            }
        }



        private async Task<Mat> RunYolo(int cameraIndex, CancellationToken token)
        {
            try
            {
                // Check for cancellation before doing any work
                token.ThrowIfCancellationRequested();

                // Run your operations
                byte[] imageBytes = await ReceiveSocket(cameraIndex, token);

                // Convert bytes to Mat
                Mat frame = Cv2.ImDecode(imageBytes, ImreadModes.Color);

                return frame;
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation here, logging or other side-effects if needed
                Debug.WriteLine("RunYolo operation was cancelled.");
                return null;
            }
            catch (Exception ex)
            {
                // Log or handle other types of exceptions as needed
                Console.WriteLine($"Exception caught while processing: {ex.Message}");
                Console.WriteLine($"Details: {ex}");
                return null;
            }
        }


        public async Task<byte[]> ReceiveSocket(int cameraIndex, CancellationToken token)
        {
            try
            {
                // Check for cancellation before receiving
                token.ThrowIfCancellationRequested();
                var ws = CameraWebsockets[cameraIndex];
                List<byte> fullMessage = new List<byte>();
                WebSocketReceiveResult result;

                do
                {
                    var messageReceived = new ArraySegment<byte>(new byte[4096]);
                    result = await ws.ReceiveAsync(messageReceived, token);
                    fullMessage.AddRange(messageReceived.Skip(messageReceived.Offset).Take(result.Count));
                } while (!result.EndOfMessage); // Continue reading until you get the entire message.

                return fullMessage.ToArray();
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation by logging or other side-effects if needed
                Debug.WriteLine("ReceiveSocket operation was cancelled.");
                return null;
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                await StartSocket(cameraIndex, token);
                await StartStream(cameraIndex, CameraURLs[cameraIndex],token);

                return await ReceiveSocket(cameraIndex,token);
            }
        }

        public async Task SendSocket(int cameraIndex, object message, CancellationToken token)
        {
            try
            {
                token.ThrowIfCancellationRequested();

                var ws = CameraWebsockets[cameraIndex];
                var Start_ServerJson = JsonConvert.SerializeObject(message);
                var Start_ServerBytes = Encoding.UTF8.GetBytes(Start_ServerJson);

                await ws.SendAsync(new ArraySegment<byte>(Start_ServerBytes, 0, Start_ServerBytes.Length), WebSocketMessageType.Text, true, token);
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation here, perhaps log it if needed
                Debug.WriteLine("SendSocket operation was cancelled.");
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                // When a WebSocketException occurs, you restart the socket and stream.
                // Be sure to consider the cancellation token when you're restarting as well.
                await StartSocket(cameraIndex, token);  // Consider passing token here too
                await StartStream(cameraIndex, CameraURLs[cameraIndex], token);  // And here
                await SendSocket(cameraIndex, message, token);  // Recursion with the same token
            }
        }

        public async Task StartSocket(int cameraIndex, CancellationToken token)
        {
            while (true)
            {
                try
                {
                    // Check for cancellation before trying to connect
                    token.ThrowIfCancellationRequested();

                    var ws = new ClientWebSocket();
                    await ws.ConnectAsync(new Uri(wsUrl), token); // Pass the cancellation token here

                    if (CameraWebsockets.ContainsKey(cameraIndex))
                    {
                        CameraWebsockets[cameraIndex] = ws;
                    }
                    else
                    {
                        CameraWebsockets.TryAdd(cameraIndex, ws);
                    }
                    return; // Break the loop if connected successfully
                }
                catch (OperationCanceledException)
                {
                    // Handle cancellation by logging or other side-effects if needed
                    Debug.WriteLine("StartSocket operation was cancelled.");
                    return;  // Exit the loop if cancelled
                }
                catch (System.Net.WebSockets.WebSocketException)
                {
                    Console.WriteLine("WebSocketException occurred while starting socket. Retrying...");
                    await Task.Delay(1000, token); // Pass the cancellation token here too
                }
            }
        }
        private async Task UpdateDisplay(Mat frame, System.Windows.Controls.Image imageControl, CancellationToken token)
        {
            // Check for cancellation before updating the UI
            if (token.IsCancellationRequested)
            {
                Debug.WriteLine("UpdateDisplay operation was cancelled.");
                return;
            }
            await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
            {
                try
                {
                    if (frame.Empty() || frame.Width == 0 || frame.Height == 0)
                    {
                        Console.WriteLine("Received an invalid frame.");
                        return; // Exit and don't process further
                    }
                    var bitmap = OpenCvSharp.Extensions.BitmapConverter.ToBitmap(frame);
                    var bitmapSource = ConvertToBitmap(bitmap);
                    imageControl.Source = bitmapSource;
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Failed to convert frame to bitmap. Frame Size: {frame.Size()}. Channels: {frame.Channels()}. Error: {ex.Message}");
                }
                
            });
        }

Solution

  • The issue you're facing is due to the fact that when you call await tuple.Item3 in your IPCameraMethod, effectively awaits the completion of the Main task. When the Main task is completed (either normally or due to cancellation), it can also lead to the WebSocket being closed immediately

    public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl) { // ... Initialize WebSocket and other operations

    // Create a TaskCompletionSource to signal when the "Stop_Stream" message is sent
    var stopStreamCompletion = new TaskCompletionSource<bool>();
    
    while (!token.IsCancellationRequested)
    {
        // ... Fetch and display video frames
    }
    
    if (token.IsCancellationRequested)
    {
        var Stop_Server = new
        {
            command = "Stop_Stream"
        };
    
        // Send the "Stop_Stream" message and await its completion
        await SendSocket(cameraIndex, Stop_Server, token);
    
        // Signal that the message has been sent
        stopStreamCompletion.SetResult(true);
    }
    
    // Now, await the completion of the Main task
    await stopStreamCompletion.Task;
    

    }