I have a DataFlow with a TransformBlock that runs an async disk Read method concurrently. For example, reading several files from disk array that provides performance benefits with a non-single read request queue depth.
Intuitively, all pipeline work should have finished when the last ActionBlock completes. However, the following happens:
Let's say Rabbit and Bear begin fetching two files into their own byte buffers. Rabbit throws an EatenByAWolf exception. Bear completes later, since its file is a bit larger. By the time Bear comes awake from winter-long hibernation, EatenByAWolf exception seems to have propagated to Completion wait-site of the last ActionBlock. Well, the problem is that here I clean up byte buffers for Rabbit and Bear alike, causing Bear to choke on an NullRefToBuffer exception.
What is the recommended approach? Should I also wait for Reader block (containing all animals) to finish before clearing buffers, or could this be handled more elegantly?
Task.WaitAll(new[] { readerBlock.Completion, lastActionBlock.Completion });
versus
lastActionBlock.Completion.Wait();
I was managing byte buffers at a wrong level of abstraction.
The solution for me was to use a stateful reader object when creating TransformBlock that performs multiple async reads. Adding ConcurrentBag<>
of buffers to the MyFileReaderWithCachingByteBuffers
reader object ensures that buffers won't clean up until the reader object captured by closure goes out of scope. That happens only when all read operations complete.
var fileReader = new MyFileReaderWithCachingByteBuffers(biggestFileSize);
var readerBlock = new TransformBlock<string, MyObject>(
animal => fileReader.ReadAsync(animal),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = customDOP });