Search code examples
c#.netsemaphoretpl-dataflow

async Producer-Consumer application with concurrent scheduler


I was implementing a Producer-Consumer using BufferBlock. The code is working well.

static async Task Produce(ITargetBlock<int> queue)
{
    try
    {
        // Post messages to the block asynchronously. 
        for (int i = 0; i < 100; i++)
        {
            Console.WriteLine("Sending: {0}", i);
            await queue.SendAsync(i);
        }
    }
    finally 
    {
        queue.Complete();
    }
}

static async Task Consume(ISourceBlock<int> queue)
{
    // Read messages from the block asynchronously. 
    while (await queue.OutputAvailableAsync())
    {
        int value = await queue.ReceiveAsync();
        Console.WriteLine("Receiving: {0}", value);
    }
}

static void Main(string[] args)
{
    // Create a BufferBlock<int> object. 
    var queue = new BufferBlock<int>();

    try
    {
        var produce = Produce(queue);
        var consume = Consume(queue);

        Task.WaitAll(produce, consume, queue.Completion);
    }
    catch (Exception exception)
    {
        Console.WriteLine("An exception was thrown: {0}", exception.Message);
        Console.WriteLine("Terminating...");
    }
}

Now I have a throttling issue, which is I want to the maximum concurrent number for consumer is 4. I want to use SemaphoreSlim bot not sure how to apply it.

Notice: it is a concurrent scheduler question, not parallelism problem.


Solution

  • If all you want is to be able to consume a certain amount at a time you can simply call TryRecieve multiple times until it's empty, or the amount was reached. Here's an extension method that handles that:

    public static bool TryReceive<T>(this BufferBlock<T> bufferBlock, int count, out IList<T> items)
    {
        items = new List<T>();   
        for (var i = 0; i < count; i++)
        {
            T item;
            if (bufferBlock.TryReceive(out item))
            {
                items.Add(item);
            }
            else
            {
                break;
            }
        }
        return items.Any();
    }
    

    And so the consumer becomes:

    static async Task Consume(BufferBlock<int> queue)
    {
        // Read messages from the block asynchronously. 
        while (await queue.OutputAvailableAsync())
        {
            IList<int> values;
            queue.TryReceive(4, out values);
            Console.WriteLine("Receiving: {0}", string.Join(", ", values));
        }
    }