I have a TransformManyBlock
with the following design:
I am running this block on a huge file (61GB), which is too large to fit into RAM. In order to avoid unbounded memory growth, I have set BoundedCapacity
to a very low value (e.g. 1) for this block, and all downstream blocks. Nonetheless, the block apparently iterates the IEnumerable greedily, which consumes all available memory on the computer, grinding every process to a halt. The OutputCount of the block continues to rise without bound until I kill the process.
What can I do to prevent the block from consuming the IEnumerable
in this way?
EDIT: Here's an example program that illustrates the problem:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
If you're on a 64-bit box, make sure to clear the "Prefer 32-bit" option in Visual Studio. I have 16GB of RAM on my computer, and this program immediately consumes every available byte.
You seem to misunderstand how TPL Dataflow works.
BoundedCapacity
limits the amount of items you can post into a block. In your case that means a single char
into the TransformManyBlock
and single string
into the ActionBlock
.
So you post a single item to the TransformManyBlock
which then returns 1024*1024
strings and tries to pass them on to the ActionBlock
which will only accept a single one at a time. The rest of the strings will just sit there in the TransformManyBlock
's output queue.
What you probably want to do is create a single block and post items into it in a streaming fashion by waiting (synchronously or otherwise) when it's capacity is reached:
private static void Main()
{
MainAsync().Wait();
}
private static async Task MainAsync()
{
var block = new ActionBlock<string>(async item =>
{
Console.WriteLine(item.Substring(0, 10));
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
foreach (var item in GetSequence('A'))
{
await block.SendAsync(item);
}
block.Complete();
await block.Completion;
}