Search code examples
c#asynchronousvideo-streamingtasktpl-dataflow

Asynchronous Task, video buffering


I am trying to understand Tasks in C# but still having some problems. I am trying to create an application containing video. The main purpose is to read the video from a file (I am using Emgu.CV) and send it via TCP/IP for process in a board and then back in a stream (real-time) way. Firstly, I did it in serial. So, reading a Bitmap, sending-receiving from board, and plotting. But reading the bitmaps and plotting them takes too much time. I would like to have a Transmit, Receive FIFO Buffers that save the video frames, and a different task that does the job of sending receiving each frame. So I would like to do it in parallel. I thought I should create 3 Tasks:

        tasks.Add(Task.Run(() => Video_load(video_path)));
        tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
        tasks.Add(Task.Run(() => VideoDisp_hw(32)));

Which I would like to run "parallel". What type of object should I use? A concurrent queue? BufferBlock? or just a list?

Thanks for the advices! I would like to ask something. I am trying to create a simple console program with 2 TPL blocks. 1 Block would be Transform block (taking a message i.e. "start" ) and loading data to a List and another block would be ActionBlock (just reading the data from the list and printing them). Here is the code below:

namespace TPL_Dataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            Random randn = new Random();

            var loadData = new TransformBlock<string, List<int>>(async sample_string =>
           {
               List<int> input_data = new List<int>();
               int cnt = 0;

                if (sample_string == "start")
                {
                   Console.WriteLine("Inside loadData");
                   while (cnt < 16)
                   {
                       input_data.Add(randn.Next(1, 255));
                       await Task.Delay(1500);
                       Console.WriteLine("Cnt");
                       cnt++;
                   }
                                    }
                else
                {
                    Console.WriteLine("Not started yet");

                }
            return input_data;
           });


            var PrintData = new ActionBlock<List<int>>(async input_data =>
            {
                while(input_data.Count > 0)
                {


                    Console.WriteLine("output Data = " + input_data.First());
                    await Task.Delay(1000);
                    input_data.RemoveAt(0);
                    
                }
 

              });

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            loadData.LinkTo(PrintData, input_data => input_data.Count() >0  );
            //loadData.LinkTo(PrintData, linkOptions);
            
            loadData.SendAsync("start");
            loadData.Complete();
            PrintData.Completion.Wait();

        }
    }
}

But it seems to work in serial way.. What am I doing wrong? I tried to do the while loops async. I would like to do the 2 things in parallel. When data available from the List then plotted.


Solution

  • You could use a TransformManyBlock<string, int> as the producer block, and an ActionBlock<int> as the consumer block. The TransformManyBlock would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>> delegate, and passed an iterator method (the Produce method in the example below) that yields values one by one:

    Random random = new Random();
    
    var producer = new TransformManyBlock<string, int>(Produce);
    
    IEnumerable<int> Produce(string message)
    {
        if (message == "start")
        {
            int cnt = 0;
            while (cnt < 16)
            {
                int value;
                lock (random) value = random.Next(1, 255);
                Console.WriteLine($"Producing #{value}");
                yield return value;
                Thread.Sleep(1500);
                cnt++;
            }
        }
        else
        {
            yield break;
        }
    }
    
    var consumer = new ActionBlock<int>(async value =>
    {
        Console.WriteLine($"Received: {value}");
        await Task.Delay(1000);
    });
    
    producer.LinkTo(consumer, new() { PropagateCompletion = true });
    
    producer.Post("start");
    producer.Complete();
    consumer.Completion.Wait();
    

    Unfortunately the producer has to block the worker thread during the idle period between yielding each value (Thread.Sleep(1500);), because the TransformManyBlock currently does not have a constructor that accepts a Func<string, IAsyncEnumerable<int>>. This will be probably fixed in the next release of the TPL Dataflow library. You could track this GitHub issue, to be informed about when this feature will be released.


    Alternative solution: Instead of linking explicitly the producer and the consumer, you could keep them unlinked, and send manually the values produced by the producer to the consumer. In this case both blocks would be ActionBlocks:

    Random random = new Random();
    
    var consumer = new ActionBlock<int>(async value =>
    {
        Console.WriteLine($"Received: {value}");
        await Task.Delay(1000);
    });
    
    var producer = new ActionBlock<string>(async message =>
    {
        if (message == "start")
        {
            int cnt = 0;
            while (cnt < 16)
            {
                int value;
                lock (random) value = random.Next(1, 255);
                Console.WriteLine($"Producing #{value}");
                var accepted = await consumer.SendAsync(value);
                if (!accepted) break; // The consumer has failed
                await Task.Delay(1500);
                cnt++;
            }
        }
    });
    
    PropagateCompletion(producer, consumer);
    
    producer.Post("start");
    producer.Complete();
    consumer.Completion.Wait();
    
    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (ex != null) target.Fault(ex); else target.Complete();
    }
    

    The main difficulty with this approach is how to propagate the completion of the producer to the consumer, so that eventually both blocks are completed. Obviously you can't use the new DataflowLinkOptions { PropagateCompletion = true } configuration, since the blocks are not linked explicitly. You also can't Complete manually the consumer, because in this case it would stop prematurely accepting values from the producer. The solution to this problem is the PropagateCompletion method shown in the above example.