Search code examples
c#.nettask-parallel-librarydataflowtpl-dataflow

TPL Dataflow block consumes all available memory


I have a TransformManyBlock with the following design:

  • Input: Path to a file
  • Output: IEnumerable of the file's contents, one line at a time

I am running this block on a huge file (61GB), which is too large to fit into RAM. In order to avoid unbounded memory growth, I have set BoundedCapacity to a very low value (e.g. 1) for this block, and all downstream blocks. Nonetheless, the block apparently iterates the IEnumerable greedily, which consumes all available memory on the computer, grinding every process to a halt. The OutputCount of the block continues to rise without bound until I kill the process.

What can I do to prevent the block from consuming the IEnumerable in this way?

EDIT: Here's an example program that illustrates the problem:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

If you're on a 64-bit box, make sure to clear the "Prefer 32-bit" option in Visual Studio. I have 16GB of RAM on my computer, and this program immediately consumes every available byte.


Solution

  • You seem to misunderstand how TPL Dataflow works.

    BoundedCapacity limits the amount of items you can post into a block. In your case that means a single char into the TransformManyBlock and single string into the ActionBlock.

    So you post a single item to the TransformManyBlock which then returns 1024*1024 strings and tries to pass them on to the ActionBlock which will only accept a single one at a time. The rest of the strings will just sit there in the TransformManyBlock's output queue.

    What you probably want to do is create a single block and post items into it in a streaming fashion by waiting (synchronously or otherwise) when it's capacity is reached:

    private static void Main()
    {
        MainAsync().Wait();
    }
    
    private static async Task MainAsync()
    {
        var block = new ActionBlock<string>(async item =>
        {
            Console.WriteLine(item.Substring(0, 10));
            await Task.Delay(1000);
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
    
        foreach (var item in GetSequence('A'))
        {
            await block.SendAsync(item);
        }
    
        block.Complete();
        await block.Completion;
    }