Search code examples
task-parallel-librarycancellationtokensource

TPL Dataflow , finish a Block , re-create a BLock


I am using TPL Dataflow to display a video while first passing the data via TCP to a board. I am using CancellationTokenSource to cancel the Block activities. But the problem is, when I am re-running the "CreateVideoProcessingNetwork" function I have no response. The .Post() command returns false. How should I re-create or "rerun" a TPL Dataflow? here is the code :

private void TPL_Click(object sender, EventArgs e)
        {
            CreateVideoProcessingNetwork();
        }

        

        public async void CreateVideoProcessingNetwork()
        {
            string video_path = @"C:\.....\video_640x360_360p.mp4";

            
            _canceller = new CancellationTokenSource();
            /****************** METHOD 1 - with yield *************/

            /* Video Loading TPL Block */
            //var video_loader = new TransformManyBlock<string, Bitmap>(load_video, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });


            var send_recv_block = new TransformBlock<Bitmap, Bitmap>(async recv_bitmap =>
            {
                Console.WriteLine("Inside send_recv block");
                var mem_stream = new MemoryStream();
                recv_bitmap.Save(mem_stream, System.Drawing.Imaging.ImageFormat.Jpeg);
                var recv_image_array = mem_stream.ToArray();


                NetworkStream stream = client.GetStream();
                byte[] transmit_buffer = new byte[4];
                transmit_buffer[0] = (byte)(recv_image_array.Length & (0xFF));
                transmit_buffer[1] = (byte)((recv_image_array.Length >> 8) & (0xFF));
                transmit_buffer[2] = (byte)((recv_image_array.Length >> 16) & (0xFF));
                transmit_buffer[3] = (byte)((recv_image_array.Length >> 24) & (0xFF));
                // Sending first the 32bit length
                await stream.WriteAsync(transmit_buffer, 0, 4);
                // Sending data
                await stream.WriteAsync(recv_image_array, 0, recv_image_array.Length);
                // Receiving data
                var recv_buffer = await Receive(stream);

                Bitmap tx_image_array;
                using (var ms = new MemoryStream(recv_image_array))
                {
                    tx_image_array = new Bitmap(ms);
                }

                return tx_image_array;
                

            },
            new ExecutionDataflowBlockOptions
            {
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            });



            /****************** METHOD 2 - with send async  ***********/
            var video_loader = new ActionBlock<string>(async path =>
            {

 

                Console.WriteLine("video_loader");
                capture = new VideoCapture(path);
                Mat matrix = new Mat();
                capture.Read(matrix);
                var mem_stream = new MemoryStream();

                while (matrix.Rows != 0 && matrix.Width != 0)
                {

                    Console.WriteLine("Inside Loop");
                    capture.Read(matrix);
                    if (matrix.Rows == 0 && matrix.Width == 0) break;
                    Bitmap bitmap = new Bitmap(matrix.Width, matrix.Rows);
                    bitmap = matrix.ToBitmap();

                    await send_recv_block.SendAsync(bitmap);
                    await Task.Delay(20);
                    if (_canceller.Token.IsCancellationRequested) break;

                }

            }, new ExecutionDataflowBlockOptions 
            {
                //BoundedCapacity = 10 ,
                CancellationToken = cancellationSource.Token
            });



            /* Video Loading TPL Block */
            var display_video = new ActionBlock<Bitmap>(async received_image =>
            {
                Console.WriteLine("Inside Display Video");
                PicturePlot2.Image = received_image;
                await Task.Delay(25);
            },
            new ExecutionDataflowBlockOptions()
            {
                TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };


            /****************** METHOD 2 - with send async *************/
            Console.WriteLine("to Link");
            var send_recv_disposable = send_recv_block.LinkTo(display_video, linkOptions);
            Console.WriteLine("Video path" + video_path);
            //var apotelesma_post = video_loader.Post(video_path);
            var apotelesma_post = await video_loader.SendAsync(video_path);
            Console.WriteLine("Apotelesma Post "+ apotelesma_post);
            video_loader.Complete();
            try
            {
                await display_video.Completion;
            }
            catch (TaskCanceledException ex)
            {
                Console.WriteLine(ex.CancellationToken.IsCancellationRequested);
                video_loader.Complete();
                send_recv_block.Complete();
                display_video.Complete();
                MessageBox.Show("Video Ended");
                
            }

            
        }
            
        private  void Stop_Reset_Click(object sender, EventArgs e)
        {
            cancellationSource.Cancel();
            _canceller.Cancel();



        }

Thanks in Advance


Solution

  • You've not shown where you declare the variable cancellationSource, which you supply to the ExecutionDataFlowBlockOptions.

    When you supply a cancellation token to the ExecutionDataFlowBlockOptions, you are telling the block to enter the Completed state with a task status of Canceled when the token is cancelled. The docs tell us this is final:

    Because the CancellationToken property permanently cancels dataflow block execution, the whole pipeline must be recreated after the user cancels the operation and then wants to add more work items to the pipeline.[1]

    Because your stop button sets this token to cancelled, when you recreate the blocks they are beginning in the cancelled state.

    Above _canceller = new CancellationTokenSource(); you need to add cancellationSource = new CancellationTokenSource();.