I'm experimenting with TPL Dataflow before porting it into my production code. The production code is a classical producer/consumer system - producer(s) produce messages (related to financial domain), consumers process those messages.
What I'm interested in, is how stable environment will stay if at some point producer(s) produce much more faster than consumers can handle it (will system blow up, or what will happen) & more importantly what to do in those cases.
So in attempt to have similar simple application I come up with following.
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
is a very simple class
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
simply news up Item
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
Now, to imitate not so fast consumer - I made ProcessItem
to hold for 100ms
.
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
Executing this results in OOM exception in 20 or so seconds.
Then I went on and added more consumers (more ActionBlocks up to 10), which wins some more time, but eventually results in the same OOM exception.
I also noticed that GC is under huge pressure (VS 2015 Diagnostics tool shows GC is running almost all the time), so I introduced object pooling (very simple one, essentially it is ConcurrentBag
storing items) for Item
, but still I'm hitting the same wall (OOM exception is thrown).
To give some details on what is in memory, why it is running out of it.
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
& ConcurrentQueue+Segment<TplDataFlow.Item>
BufferBlock
's InputBuffer is full of Item
s (Count=14,562,296)BoundedCapacity
for ActionBlock
(s), their input buffer is also close to the configurated number (InputCount=99,996)To make sure that slower producer would make it possible for consumers to keep up, I made producer to sleep between iterations :
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
And it works fine - no exception is thrown, memory usage is constantly low, I don't see any GC pressure anymore.
So I have few questions
BufferBlock
's internal buffer is getting filled with messages very fast, and holds off to messages until some of the consumers come back to ask for next message as a result application runs out of memory (due to filled up internal buffer of BufferBlock
) - would you agree with this ?I'm using Microsoft.Tpl.Dataflow
package -version 4.5.24.
.NET 4.5 (C# 6). Process is 32 bit.
You've identified the problem nicely: the BufferBlock
is filling its input buffer until it hits OOM.
To solve this, you should add a BoundedCapacity
option to your buffer block as well. This will throttle producers automatically for you (no need for the Thread.Sleep
in your producer).