I have a classic producer consumer problem where multiple users can simultaneously POST data to a web API method (api/test), which triggers IO intensive long running operations asynchronously. I've limited the amount of concurrent requests to 5 using an ActionBlock
chained to a BufferBlock
.
The Producer
class is registered as a singleton and the goal is to allow all calls to api/test to feed into this one queue. This means that things like completing the block aren't an option.
What is the most efective way to wait for the completion of my initiated work from the controller?
Web API controller:
[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
private Producer producer;
public TestController(Producer producer)
{
this.producer = producer;
}
[HttpGet]
public async Task<string[]> Values()
{
for (int i = 1; i <= 10; i++)
{
await this.producer.AddAsync(1);
}
// i've added my work to the queue, elegant completion required
return new string[] { "value1", "value2" };
}
}
Producer / Consumer implementation:
public class Producer
{
private BufferBlock<int> queue;
private ActionBlock<int> consumer;
public List<int> results = new List<int>();
private void InitializeChain()
{
queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
consumer = new ActionBlock<int>(x =>
{
Thread.Sleep(5000);
Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
results.Add(x);
}, consumerOptions);
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task AddAsync(int data)
{
await queue.SendAsync(data);
}
public Producer()
{
this.InitializeChain();
}
}
So there are many approaches and synchronisation primitives you can use to solve this, each with their own benefits, fault tolerance, and issues depending on your needs. Here is an awaitable example with TaskCompletionSource
Given
public class Producer
{
private BufferBlock<int> _queue;
private ActionBlock<int> _consumer;
public Action<int,string> OnResult;
public Producer()
{
InitializeChain();
}
private void InitializeChain()
{
_queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
_consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);
_queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
private async Task SomeIoWorkAsync(int x)
{
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
await Task.Delay(5000);
OnResult?.Invoke(x,$"SomeResult {x}");
}
public Task AddAsync(int data) => _queue.SendAsync(data);
}
Awaitable
You could easily refactor this to to do the send and await in one call.
public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
var tcs = new TaskCompletionSource<string>();
producer.OnResult += (id,result) =>
{
if(id == myId)
tcs.TrySetResult(result);
};
return tcs.Task;
}
Usage
var producer = new Producer();
// to simulate something you are waiting for, and id or what ever
var myId = 7;
// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);
// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
await producer.AddAsync(i);
});
// wait for your results to pop out
var result = await task;
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");
// you can happily end here, the pipeline will keep going
Console.ReadKey();
output
12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20
Note : you may need to play with the example so it doesn't timeout
Example of doing it all at once
public async Task<string> AddAsync(int data)
{
await _queue.SendAsync(data);
return await WaitForConsumerAsync(data);
}
public Task<string> WaitForConsumerAsync(int data)
{
var tcs = new TaskCompletionSource<string>();
OnResult += (id, result) =>
{
if (id == data)
tcs.TrySetResult(result);
};
return tcs.Task;
}
Additional notes
This is really only an academic example of an awaitable
event. I assume that your pipeline is more complicated then the example given and you are doing a combination of CPU and IO bound workloads, additionally that you actually need a BufferBlock
in this example it's redundant.
BoundedCapacity
unless you had some sort of memory constraints.BoundedCapacity
and the default EnsureOrdered = true
. The pipeline will be more efficient with EnsureOrdered = false
. Jobs will pop-out when they are finished and back pressure will not be affected by the ordering of different results, which will mean items will likely progress through the pipeline fasterSingleProducerConstrained = true
as your blocks are linear