Search code examples
c#multithreadingasync-awaitconcurrencyqueue

Handling Frame Order in a Concurrent Processing Scenario with Bounding Box Updates


Intro:

I'm working on a video processing application in C# where frames are captured, processed for object detection (using YOLO), and then displayed. I'm encountering challenges in maintaining the correct order of frames due to the nature of asynchronous processing and the FIFO structure of ConcurrentQueue.

Scenario:

Frames are captured continuously and added to a ConcurrentQueue. Each FrameData object contains an OpenCV Mat frame and a list of bounding boxes (initially empty). In the processing task, I dequeue a frame, run YOLO to get bounding boxes, and then update the frame with these bounding boxes. The updated frame needs to be placed back in its original position in the queue.

Code Snippet:

public class FrameData
        {
            public Mat Frame { get; set; }
            public List<Dictionary<string, object>> BoundingBoxes { get; set; }

            public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
            {
                Frame = frame;
                BoundingBoxes = boundingBoxes;
            }
        }


        private ConcurrentQueue<FrameData> frameBuffer = new ConcurrentQueue<FrameData>();

        public void StartCapture()
        {
            if (videoCaptures != null)
            {
                cancellationTokenSource = new CancellationTokenSource();
                CancellationToken token = cancellationTokenSource.Token;
                Task updateFrame = UpdateFrame(token);
                Task processFrame = ProcessFrame(token);

                if (RunODModel && ObjectDetection)
                {
                    
                }
            }

        }

        public async Task UpdateFrame(CancellationToken token)
        {
            if (isCapturing && !isPaused)
            {
                return; // Video is already playing, so just return
            }
            var capture = videoCaptures;
            const int maxBufferSize = 30;
            isCapturing = true;
            isPaused = false;

            while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
            {
                Mat frame = new Mat();
                if (isPaused)
                {
                    await Task.Delay(100);  // Wait a bit before checking again.
                    continue;
                }

                if (capture.Read(frame) && !frame.Empty())
                {
                    frameBuffer.Enqueue(new FrameData(frame)); // Enqueue with no bounding boxes

                    // Display and dispose of the oldest frame if buffer size is equal to or exceeds 20
                    if (frameBuffer.Count >= 20)
                    {
                        if (frameBuffer.TryDequeue(out FrameData oldestFrameData))
                        {
                            var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
                            imageControl.Source = bitmapSource; // Assuming imageControl is accessible here

                        }
                    }
                    if (frameBuffer.Count >= maxBufferSize)
                    {
                        frameBuffer.TryDequeue(out _); // Remove the oldest frame if buffer is full
                    }
                }
                else
                {
                    frame.Dispose();
                }

                Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");

                // Add delay if needed to control frame rate
                await Task.Delay(33); // For example, delay for ~30fps
            }

            // Cleanup: Dispose of any frames left in the buffer
            while (!frameBuffer.IsEmpty)
            {
                if (frameBuffer.TryDequeue(out FrameData remainingFrameData))
                {
                    remainingFrameData.Frame.Dispose(); // Dispose the Mat object within the FrameData
                }
            }
        }


        public async Task ProcessFrame(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                if (frameBuffer.TryDequeue(out FrameData frameData))
                {
                    List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
                    frameData.BoundingBoxes = boundingBoxes; // Add bounding boxes to the frame data

                    frameBuffer.Enqueue(frameData); // Enqueue frame data back to the buffer
                }
                else
                {
                    await Task.Delay(10);
                }
            }
        }


        private async Task<List<Dictionary<string, object>>> RunYolo(Mat frame)
        {
            const string InferenceRequest = "/process";
            const int YoloInputSize = 640;  // replace with actual input size

            // Convert the OpenCV Mat to a byte array in PNG format
            var byteContent = frame.ToBytes(".png");
            using var content = new MultipartFormDataContent
            {
                { new ByteArrayContent(byteContent), "image_file", "frame.png" },
                { new StringContent("iris"), "model_name" },
                { new StringContent(YoloInputSize.ToString()), "img_size" },
                { new StringContent(ObjectTracking ? "true" : "false"), "runFaceRec" },
                { new StringContent(yoloAccuracy.ToString()), "yoloAccuracy" }, // Add frame rate to the content
                { new StringContent(oTFDAccuracy.ToString()), "oTFDAccuracy" }, // Add frame rate to the content
                { new StringContent(oTFRAccuracy.ToString()), "oTFRAccuracy" }, // Add frame rate to the content
                { new StringContent(ObjectTracking ? "true" : "false"), "apply_tracking" },
            };

            var response = await client.PostAsync(ProcUrl + InferenceRequest, content);
            if (!response.IsSuccessStatusCode)
            {
                throw new HttpRequestException($"Response status code does not indicate success: {response.StatusCode} ({response.ReasonPhrase}).");
            }
            var jsonResponse = await response.Content.ReadAsStringAsync();
            var resultsList = JsonConvert.DeserializeObject<List<Dictionary<string, object>>>(jsonResponse);
            return resultsList;
        }

Challenge:

Since ConcurrentQueue is a FIFO structure, re-enqueuing a processed frame places it at the end of the queue, disrupting the original capture order. This leads to an issue where frames are not displayed in the order they were captured, causing visual inconsistency in the video playback.

Attempts:

Using a List<FrameData> with Locks: I tried replacing ConcurrentQueue with a List and used locks for thread safety. While this allowed accessing the newest frame directly, it introduced complexity in managing thread safety and potential performance bottlenecks due to locking.

Double-Ended Queue (Deque): I considered using a deque to access both ends of the queue. However, .NET doesn't provide a built-in deque, and using third-party libraries or custom implementations might complicate the scenario.

Atomic Counter with Dictionary: Another approach was using a ConcurrentDictionary keyed by an atomic counter (sequence number) for each frame. While this allowed processing frames out of order and displaying them in sequence, it significantly increased the complexity of frame management and synchronization.

Question:

How can I efficiently process the newest frame, update it with bounding boxes, and then place it back at its original position in the queue, all while maintaining the correct order for display? Are there any specific data structures or design patterns in C# that could simplify this scenario?

Re-attempted logic using Atomic Counter with dictionary(As per Shingo):

private ConcurrentDictionary<int, FrameData> frameBuffer = new ConcurrentDictionary<int, FrameData>();
private int frameCounter = 0;  // Atomic counter for frames
private const int maxBufferSize = 30;  // Define the maximum buffer size
private const int displayThreshold = 20;  // Threshold for displaying the oldest frame


        public class FrameData
        {
            public Mat Frame { get; set; }
            public List<Dictionary<string, object>> BoundingBoxes { get; set; }

            public FrameData(Mat frame, List<Dictionary<string, object>> boundingBoxes = null)
            {
                Frame = frame;
                BoundingBoxes = boundingBoxes;
            }
        }


       
        public void StartCapture()
        {
            if (videoCaptures != null)
            {
                cancellationTokenSource = new CancellationTokenSource();
                CancellationToken token = cancellationTokenSource.Token;
                Task updateFrame = UpdateFrame(token);
                Task processFrame = ProcessFrame(token);

                if (RunODModel && ObjectDetection)
                {
                    
                }
            }

        }

        public async Task UpdateFrame(CancellationToken token)
        {
            if (isCapturing && !isPaused)
            {
                return; // Video is already playing, so just return
            }

            var capture = videoCaptures;
            isCapturing = true;
            isPaused = false;

            while (isCapturing && capture.IsOpened() && !token.IsCancellationRequested)
            {
                Mat frame = new Mat();
                if (isPaused)
                {
                    await Task.Delay(100);  // Wait a bit before checking again.
                    continue;
                }

                if (capture.Read(frame) && !frame.Empty())
                {
                    int currentFrameNumber = Interlocked.Increment(ref frameCounter);
                    frameBuffer[currentFrameNumber] = new FrameData(frame); // Add frame to the buffer

                    // Display and dispose of the oldest frame if buffer size reaches the display threshold
                    if (frameBuffer.Count >= displayThreshold)
                    {
                        int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
                        if (frameBuffer.TryRemove(oldestFrameKey, out FrameData oldestFrameData))
                        {
                            // Display the frame
                            var bitmapSource = WriteableBitmapConverter.ToWriteableBitmap(oldestFrameData.Frame);
                            imageControl.Source = bitmapSource; // Assuming imageControl is accessible here

                            // Dispose of the frame
                            oldestFrameData.Frame.Dispose();
                        }
                    }

                    // Remove the oldest frame if buffer size exceeds the maximum limit
                    if (frameBuffer.Count > maxBufferSize)
                    {
                        int oldestFrameKey = frameBuffer.Keys.Min(); // Get the oldest frame's key
                        frameBuffer.TryRemove(oldestFrameKey, out _);
                    }
                    Debug.WriteLine(frameCounter);
                }
                else
                {
                    frame.Dispose();
                }

                Console.WriteLine($"Frame buffer count: {frameBuffer.Count}");

                // Add delay if needed to control frame rate
                await Task.Delay(33); // For example, delay for ~30fps
            }

            // Cleanup: Dispose of any frames left in the buffer
            foreach (var frameData in frameBuffer.Values)
            {
                frameData.Frame.Dispose();
            }
            frameBuffer.Clear();
        }


        public async Task ProcessFrame(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                int latestFrameKey = frameBuffer.Keys.Max(); // Get the newest frame's key
                if (frameBuffer.TryRemove(latestFrameKey, out FrameData frameData))
                {
                    List<Dictionary<string, object>> boundingBoxes = await RunYolo(frameData.Frame);
                    frameData.BoundingBoxes = boundingBoxes;

                    frameBuffer.TryAdd(latestFrameKey, frameData); // Update the frame back at the same position
                }
                else
                {
                    await Task.Delay(10); // Delay if no frame is available or processing failed
                }
            }
        }

Solution

  • My recommendation would be to use DataFlow. This allow you to setup a pipeline for processing where you can configure parallelism.

    A very simple pipeline might look something like this:

    processBlock = new TransformBlock<FrameData , FrameData >(
        DoProcessing,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxThreads,
            BoundedCapacity = maxThreads*2,
        });
    outputBlock = new ActionBlock<FrameData >(
        HandleResult, new ExecutionDataflowBlockOptions()
        {
            // this needs to run on the UI thread to get the correct synchronization context
            TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
        });
    
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    processBlock.LinkTo(outputBlock, linkOptions);
    
    ...
    
    public FrameData DoProcessing(FrameData input){
       ...
    }
    public void HandleResult(FrameData input){
       // On UI thread
       ...
    }
    

    This will process frameData in parallel, while serializing the result for update on the UI thread, in the original order. Just do processBlock.Post(...) to add frames. This is setup with a 'BoundedCapacity' to drop frames if the processing cannot keep up, this may or may not be desired in your particular scenario.