Search code examples
c#task-parallel-librarytpl-dataflow

identifiing the simultaneous tasks in a TPL dataflow


I have 1000 elements in a TPL dataflow block, each element will call external webservices.

the web service supports a maximum of 10 simultaneous calls, which is easily achieved using:

new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10
    ...
}

The web service requires each call to have a unique id passed which distinguises it from the other simultaneous calls. In theory this should be a guid, but in practise the 11th GUID will fail - because the throttling mechanism on the server is slow to recognise that the first call is finished.

The vendor suggests we recycle the guids, keeping 10 in active use.

I intend to have an array of GUIDS, each task will use (Interlocked.Increment(ref COUNTER) % 10 ) as the array index

EDIT : I just realised this won't work! It assumes tasks will complete in order which they may not I could implement this as a queue of IDs where each task borrows and returns one, but the question still stands, is there a an easier, pre bulit thread-safe way to do this?

(there will never be enough calls for COUNTER to overflow)

But I've been surprised a number of times by C# (I'm new to .net) that I am implementing something that already exists.

Is there a better thread-safe way for each task to recycle from a pool of ids?


Solution

  • Creating resource pools is the exact situation System.Collections.ConcurrentBag<T> is useful for. Wrap it up in a BlockingCollection<T> to make the code easier.

    class Example
    {
        private readonly BlockingCollection<Guid> _guidPool;
        private readonly TransformBlock<Foo, Bar> _transform;     
    
        public Example(int concurrentLimit)
        {
            _guidPool = new BlockingCollection<Guid>(new ConcurrentBag<Guid>(), concurrentLimit)
            for(int i = 0: i < concurrentLimit; i++)
            {
                _guidPool.Add(Guid.NewGuid());
            }
    
            _transform = new TransformBlock<Foo, Bar>(() => SomeAction, 
                                                      new ExecutionDataflowBlockOptions
                                                      {
                                                         MaxDegreeOfParallelism = concurrentLimit
                                                         //...
                                                      });
            //...
        }
    
        private async Task<Bar> SomeAction(Foo foo)
        {
            var id= _guidPool.Take();
            try
            {
                 //...
            }
            finally
            {
                _guidPool.Add(id);
            }
        }
    }