Search code examples
c#async-awaittask-parallel-librarytpl-dataflow

The unit of work problem in TPL Dataflow Pipeline


I have a classic producer consumer problem where multiple users can simultaneously POST data to a web API method (api/test), which triggers IO intensive long running operations asynchronously. I've limited the amount of concurrent requests to 5 using an ActionBlock chained to a BufferBlock.

The Producer class is registered as a singleton and the goal is to allow all calls to api/test to feed into this one queue. This means that things like completing the block aren't an option.

What is the most efective way to wait for the completion of my initiated work from the controller?

Web API controller:

[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
    private Producer producer;

    public TestController(Producer producer)
    {
        this.producer = producer;
    }
    [HttpGet]
    public async Task<string[]> Values()
    {
        for (int i = 1; i <= 10; i++)
        {
            await this.producer.AddAsync(1);
        }

        // i've added my work to the queue, elegant completion required
        return new string[] { "value1", "value2" };
    }

}

Producer / Consumer implementation:

public class Producer
{
    private BufferBlock<int> queue;
    private ActionBlock<int> consumer;
    public List<int> results = new List<int>();

    private void InitializeChain()
    {
        queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
        consumer = new ActionBlock<int>(x =>
        {
            Thread.Sleep(5000);
            Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
            results.Add(x);
        }, consumerOptions);
        queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
    }
    public async Task AddAsync(int data)
    {
        await queue.SendAsync(data);
    }
    public Producer()
    {
        this.InitializeChain();
    }
}

Solution

  • So there are many approaches and synchronisation primitives you can use to solve this, each with their own benefits, fault tolerance, and issues depending on your needs. Here is an awaitable example with TaskCompletionSource

    Given

    public class Producer
    {
       private BufferBlock<int> _queue;
       private ActionBlock<int> _consumer;
       public Action<int,string> OnResult;
       public Producer()
       {
          InitializeChain();
       }
       private void InitializeChain()
       {
          _queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
          var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };    
          _consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);   
          _queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
       }
    
       private async Task SomeIoWorkAsync(int x)
       {
          Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
          await Task.Delay(5000);
          OnResult?.Invoke(x,$"SomeResult {x}");
       }
    
       public Task AddAsync(int data) => _queue.SendAsync(data);
    }
    

    Awaitable

    You could easily refactor this to to do the send and await in one call.

    public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
    {
       var tcs = new TaskCompletionSource<string>();
    
       producer.OnResult += (id,result) =>
       {
          if(id == myId)
             tcs.TrySetResult(result);
       };
    
       return tcs.Task;
    }
    

    Usage

    var producer = new Producer();
    
    // to simulate something you are waiting for, and id or what ever
    var myId = 7;
    
    // you could send and await in the same method if needed. this is just an example
    var task = WaitForConsumerAsync(producer,myId);
    
    // create random work for the bounded capacity to fill up
    // run this as a task so we don't hit the back pressure before we await (just for this test)
    Task.Run(async () =>
    {
       for (int i = 1; i <= 20; i++)
          await producer.AddAsync(i);
    });
    
    // wait for your results to pop out
    var result = await task;
    
    Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");
    
    // you can happily end here, the pipeline will keep going
    Console.ReadKey();
    

    output

    12:04:41.62464 : Processing 3
    12:04:41.6246489 : Processing 1
    12:04:41.6246682 : Processing 2
    12:04:41.624641 : Processing 4
    12:04:41.624661 : Processing 5
    12:04:41.8530723 : Processing 7
    12:04:41.8530791 : Processing 8
    12:04:41.8531427 : Processing 10
    12:04:41.8530716 : Processing 6
    12:04:41.8530967 : Processing 9
    12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
    12:04:42.0532178 : Processing 11
    12:04:42.0532453 : Processing 12
    12:04:42.0532721 : Processing 14
    12:04:42.0532533 : Processing 13
    12:04:42.2674406 : Processing 15
    12:04:42.2709914 : Processing 16
    12:04:42.2713017 : Processing 18
    12:04:42.2710417 : Processing 17
    12:04:42.4689852 : Processing 19
    12:04:42.4721405 : Processing 20
    

    Full Demo Here

    Note : you may need to play with the example so it doesn't timeout

    Example of doing it all at once

    public async Task<string> AddAsync(int data)
    {
       await _queue.SendAsync(data);
       return await WaitForConsumerAsync(data);
    }
    
    public Task<string> WaitForConsumerAsync(int data)
    {
       var tcs = new TaskCompletionSource<string>();
    
       OnResult += (id, result) =>
       {
          if (id == data)
             tcs.TrySetResult(result);
       };
    
       return tcs.Task;
    }
    

    Additional notes

    This is really only an academic example of an awaitable event. I assume that your pipeline is more complicated then the example given and you are doing a combination of CPU and IO bound workloads, additionally that you actually need a BufferBlock in this example it's redundant.

    1. If you were waiting for pure IO workloads, you would probably be better of to just await them, no pipeline needed.
    2. In the information you have given there is no real need to create back pressure with a BoundedCapacity unless you had some sort of memory constraints.
    3. You need to be careful with BoundedCapacity and the default EnsureOrdered = true. The pipeline will be more efficient with EnsureOrdered = false. Jobs will pop-out when they are finished and back pressure will not be affected by the ordering of different results, which will mean items will likely progress through the pipeline faster
    4. You could also use other framework like RX, which would likely make all this more elegant and fluent
    5. You could also get a small efficiency by setting SingleProducerConstrained = true as your blocks are linear