Search code examples
asynchronousasync-awaitwaithandle

How To Using Dataflow With Only BroadcastBlock And ActionBlock


This is my first question in SO, i'm new using DataFlow with BroadcastBlock and ActionBlock, i hope i can get solution in here. Here's the structure.

Model

class SampleModel
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public bool Success { get; set; } = true;
    public object UniqueData { get; set; }

    public override string ToString()
    {
        StringBuilder sb = new StringBuilder();
        sb.AppendLine($"Id - {Id}");
        sb.AppendLine($"Success - {Success}");
        sb.AppendLine($"UniqueData - {UniqueData}");
        string tmp = sb.ToString();
        sb.Clear();
        return tmp;
    }
}

DataFlow Logic

class CreateDownloadTask
{
    public async Task VeryLongProcess()
    {
        await Task.Run(async () =>
        {
            Console.WriteLine("Long Process Working..");
            await Task.Delay(TimeSpan.FromSeconds(5));
            Console.WriteLine("Long Process Done..");
        });
    }

    public async Task CreateSimpleBroadcastX<T>(T data)
    {
        Action<T> process = async model =>
        {
            Console.WriteLine("Working..");
            await VeryLongProcess();
            Console.WriteLine("Done");
        };


        var broad = new BroadcastBlock<T>(null);

        var action = new ActionBlock<T>(process);

        var dflo = new DataflowLinkOptions { PropagateCompletion = true };

        broad.LinkTo(action, dflo);

        await broad.SendAsync(data);

        broad.Complete();

        await action.Completion.ContinueWith(async tsk =>
        {
            Console.WriteLine("Continue data");
        }).ConfigureAwait(false);

        Console.WriteLine("All Done");
    }
}

Caller

var task = cdt.CreateSimpleBroadcastX<SampleModel>(new SampleModel
{
    UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");

I expect the result should be

Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed

But what i've got is

Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done

This is happen when ther is async-await inside of ActionBlock. Now, the question is, is that possible to make the result as i expected without WaitHandle ?

That mean, ActionBlock.Completion will be wait until the Action or Delegate inside the ActionBlock is complete executed?

Or i'm i doing it wrong with that patter?

Thanks in Advance, and sorry for my bad english.


Solution

  • Your problem is here:

    Action<T> process = async model => ...
    

    That code creates an async void method, which should be avoided. One of the reasons you should avoid async void is because it is difficult to know when the method has completed. And this is exactly what is happening: the ActionBlock<T> cannot know when your delegate has completed because it is async void.

    The proper delegate type for an asynchronous method without a return value that takes a single argument is Func<T, Task>:

    Func<T, Task> process = async model => ...
    

    Now that the asynchronous method returns a Task, the ActionBlock can know when it completes.