I was implementing a Producer-Consumer using BufferBlock
. The code is working well.
static async Task Produce(ITargetBlock<int> queue)
{
try
{
// Post messages to the block asynchronously.
for (int i = 0; i < 100; i++)
{
Console.WriteLine("Sending: {0}", i);
await queue.SendAsync(i);
}
}
finally
{
queue.Complete();
}
}
static async Task Consume(ISourceBlock<int> queue)
{
// Read messages from the block asynchronously.
while (await queue.OutputAvailableAsync())
{
int value = await queue.ReceiveAsync();
Console.WriteLine("Receiving: {0}", value);
}
}
static void Main(string[] args)
{
// Create a BufferBlock<int> object.
var queue = new BufferBlock<int>();
try
{
var produce = Produce(queue);
var consume = Consume(queue);
Task.WaitAll(produce, consume, queue.Completion);
}
catch (Exception exception)
{
Console.WriteLine("An exception was thrown: {0}", exception.Message);
Console.WriteLine("Terminating...");
}
}
Now I have a throttling issue, which is I want to the maximum concurrent number for consumer is 4. I want to use SemaphoreSlim
bot not sure how to apply it.
Notice: it is a concurrent scheduler question, not parallelism problem.
If all you want is to be able to consume a certain amount at a time you can simply call TryRecieve
multiple times until it's empty, or the amount was reached. Here's an extension method that handles that:
public static bool TryReceive<T>(this BufferBlock<T> bufferBlock, int count, out IList<T> items)
{
items = new List<T>();
for (var i = 0; i < count; i++)
{
T item;
if (bufferBlock.TryReceive(out item))
{
items.Add(item);
}
else
{
break;
}
}
return items.Any();
}
And so the consumer becomes:
static async Task Consume(BufferBlock<int> queue)
{
// Read messages from the block asynchronously.
while (await queue.OutputAvailableAsync())
{
IList<int> values;
queue.TryReceive(4, out values);
Console.WriteLine("Receiving: {0}", string.Join(", ", values));
}
}