Search code examples
c#async-awaittpl-dataflow

Create list of ActionBlock<T> that will complete when any fail


In a scenario where await may be called on an 'empty' list of tasks.

How do I await a list of Task<T>, and then add new tasks to the awaiting list until one fails or completes.

I am sure there is must be an Awaiter or CancellationTokenSource solution for this problem.

public class LinkerThingBob
{
    private List<Task> ofmyactions = new List<Task>();

    public void LinkTo<T>(BufferBlock<T> messages) where T : class
    {
        var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));

        // this would not actually work, because the WhenAny 
        // will not include subsequent actions.
        ofmyactions.Add(action.Completion);

        // link the new action block.
        this._inboundMessageBuffer.LinkTo(block);
    }

    // used to catch exceptions since these blocks typically don't end.
    public async Task CompletionAsync()
    {
        // how do i make the awaiting thread add a new action
        // to the list of waiting tasks without interrupting it
        // or graciously interrupting it to let it know there's one more

        // more importantly, this CompletionAsync might actually be called
        // before the first action is added to the list, so I actually need
        // WhenAny(INFINITE + ofmyactions)
        await Task.WhenAny(ofmyactions);
    }
}

My problem is that I need a mechanism where I can add each of the action instances created above to a Task<T> that will complete when there is an exception.

I am not sure how best to explain this but:

  • The task must not complete until at least one call to LinkTo<T> has been made, so I need to start with an infinite task

  • each time LinkTo<T> is called, the new action must be added to the list of tasks, which may already be awaited on in another thread.


Solution

  • There isn't anything built-in for this, but it's not too hard to build one using TaskCompletionSource<T>. TCS is the type to use when you want to await something and there isn't already a construct for it. (Custom awaiters are for more advanced scenarios).

    In this case, something like this should suffice:

    public class LinkerThingBob
    {
      private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
    
      private async Task ObserveAsync(Task task)
      {
        try
        {
          await task;
          _tcs.TrySetResult(null);
        }
        catch (Exception ex)
        {
          _tcs.TrySetException(ex);
        }
      }
    
      public void LinkTo<T>(BufferBlock<T> messages) where T : class
      {
        var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
    
        var _ = ObserveAsync(action.Completion);
    
        this._inboundMessageBuffer.LinkTo(block);
      }
    
      public Task Completion { get { return _tcs.Task; } }
    }
    

    Completion starts in a non-completed state. Any number of blocks can be linked to it using ObserveAsync. As soon as one of the blocks completes, Completion also completes. I wrote ObserveAsync here in a way so that if the first completed block completes without error, then so will Completion; and if the first completed block completes with an exception, then Completion will complete with that same exception. Feel free to tweak for your specific needs. :)