Search code examples
c#concurrencytask-parallel-librarytpl-dataflow

Should I choose simple Dictionary or ConcurrentDictionary working with task parallel library


Here is a simplified scenario - user wants to download and process some data:

private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
    foreach (var (id, path, name) in properties)
    {
        _testDictionary.TryAdd(id, (path, name));
    }
    await CreatePipeline(properties);
    //after returning I would like to check if _testDictionary contains any elements,
    //and what is their status
}

All items that come in, are registered in ConcurrentDictionary, then TPL Dataflow pipeline is called to do downloading and processing:

public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
    var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => { return data.id; },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    var resultsBlock = new ActionBlock<int>((data) =>
    {
        _testDictionary.TryRemove(data, out _);
        //or
        //_testDictionary.AddOrUpdate(...);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    downloadBlock.LinkTo(resultsBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var item in properties)
    {
        await downloadBlock.SendAsync(item);
    }
    resultsBlock.Complete();
    await resultsBlock.Completion;
}

At the end in the results block item is removed from _testDictionary (or updated) according to how it went. My silly question is - if I set MaxDegreeOfParallelism = 1 for all the blocks creating my pipeline and make sure there won't be more than one pipeline running in the same time, do I really need ConcurrentDictionary for this or simple Dictionary would be sufficient? I am concerned that pipeline could be executed on a different thread and accessing simple Dictionary from there could lead to problems.


Solution

  • Dataflow

    As I can see your StartDownload tries to act like a producer and your CreatePipeline as a consumer from the _testDictionary point of view. The Add and the Remove calls are separated into two different functions that's why you needed to make that variable class level.

    What if the CreatePipeline contains both calls and it returns all the unprocessed elements?

    public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
    {
        var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
            properties.ToDictionary(
                prop => prop.id, 
                prop => (prop.path, prop.name)));
    
        // var downloadBlock = ...;
    
        var resultsBlock = new ActionBlock<int>(
            (data) => unprocessed.TryRemove(data, out _), options);
    
        //...
    
        downloadBlock.Complete();
        await resultsBlock.Completion;
    
        return unprocessed.ToDictionary(
            dict => dict.Key,
            dict => dict.Value);
    }
    

    Ordering

    If ordering does not matter then you could consider to rewrite the TransformBlock population logic like this:

    await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
    

    ImmutableDictionary

    If you want to make sure that the returned unprocessed items can't be modified by other Threads then you could take advantage of the ImmutableDictionary.

    So, if we put everything together it might look like this:

    public async Task StartDownload(List<(int id, string path, string name)> properties)
    {
        var unprocessedProperties = await CreatePipeline(properties);
        foreach (var property in unprocessedProperties)
        {
            //TODO
        }
    }
    
    public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
    {
        var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};
    
        var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
            properties.ToDictionary(
                prop => prop.id, 
                prop => (prop.path, prop.name)));
    
        var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
            (data) => data.id, options);
    
        var resultsBlock = new ActionBlock<int>(
            (data) => unprocessed.TryRemove(data, out _), options);
    
        downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
        await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
    
        downloadBlock.Complete();
        await resultsBlock.Completion;
    
        return unprocessed.ToImmutableDictionary(
            dict => dict.Key, 
            dict => dict.Value); 
    }
    

    EDIT: Reflect to new new requirements

    As the OP pointed out the main reason behind the dictionary is to provide the ability to extend the to be processed queue while the processing is still happening.

    In other words the processing and gathering of the to-be-processed items are not a one time thing rather than a continuous activity.

    The good thing is that you can get rid of the _testDictionary and resultsBlock entirely. All you need is to continuously Post or Send new data to the TransformBlock. The processing is awaited in a separate method (StopDownload).

    private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
    
    public MyAwesomeClass()
    {
        downloadBlock = new TransformBlock<(int id, string path, string name), int>(
            (data) => data.id, 
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    }
    
    public void StartDownload(List<(int id, string path, string name)> properties)
    {
       //Starts to send props, but does not await them
        _ = properties.Select(downloadBlock.SendAsync).ToList();
       
       //You can await the send operation if you wish 
    }
    
    public async Task StopDownload()
    {
        downloadBlock.Complete();
        await downloadBlock.Completion;
    }  
    

    This structure can be modified easily to inject a BufferBlock to smooth the load:

    private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
    
    public MyAwesomeBufferedClass()
    {
        var transform = new TransformBlock<(int id, string path, string name), int>(
            (data) => data.id,
            new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});
    
        var buffer = new BufferBlock<(int id, string path, string name)>(
            new DataflowBlockOptions() { BoundedCapacity = 100});
    
        buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
        downloadBlock = buffer;
    }
    
    public void StartDownload(List<(int id, string path, string name)> properties)
    {
        _ = properties.Select(downloadBlock.SendAsync).ToList(); 
    }
    
    public async Task StopDownload()
    {
        downloadBlock.Complete();
        await downloadBlock.Completion;
    }