I have a Dataflow pipeline that consists of multiple blocks that are processing heterogeneous documents (XLS, PDF etc). Each type of document is processed by a dedicated TransformBlock
. At the end of the pipeline I have an ActionBlock
that receives all the processed documents, and uploads them one by one to a web server. My problem is that I can't find a way to satisfy the requirement of uploading the documents in the same order they were initially entered in the pipeline. For example I can't use the EnsureOrdered
option to my advantage, because this option configures the behavior of a single block, and not the behavior of multiple blocks that are working in parallel. My requirements are:
For example it is required that the document #8 must be uploaded after the document #7, even if it is processed before the document #7.
The fifth requirement means that I can't wait for all documents to be processed, then sort them by index, and finally upload them. The uploading must happen concurrently with the processing.
Here is a minimal example of what I'm trying to do. For simplicity I am not feeding the blocks with instances of the IDocument
interface, but with simple integers. The value of each integer represents the order in which it was entered in the pipeline, and the order in which it must be uploaded:
var xlsBlock = new TransformBlock<int, int>(document =>
{
int duration = 300 + document % 3 * 300;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
int duration = 100 + document % 5 * 200;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var uploader = new ActionBlock<int>(async document =>
{
Console.WriteLine($"Uploading document #{document}");
await Task.Delay(500); // Simulate I/O-bound work
});
xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);
foreach (var document in Enumerable.Range(1, 10))
{
if (document % 2 == 0)
xlsBlock.Post(document);
else
pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => uploader.Complete());
await uploader.Completion;
The output is:
Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10
The desirable order is #1, #2, #3, #4, #5, #6, #7, #8, #9, #10.
How can I restore the order of the processed documents, before sending them to the uploader
block?
Clarification: Changing drastically the schema of the pipeline, by replacing the multiple specific TransformBlock
s with a single generic TransformBlock
, is not an option. The ideal scenario would be to intercept a single block between the processors and the uploader, that will restore the order of the documents.
uploader
should add the document into some sorted list of completed documents, and check if the added document is the one that it should be uploaded next. If it should then remove and upload all the documents from the sorted list until there is one missing.
There is also a synchronization problem. Access to this sorted list must be synchronized across threads. But you want all threads to be doing something instead of waiting for other threads to complete their work. So, uploader
should work with the list like this:
upload_in_progress
flag is set then do nothing and return.upload_in_progress
flag, and return.upload_in_progress
flag,I hope I imagined it right. As you can see it's tricky do make it both safe and efficient. There's surely a way to do it with only one lock in most of the cases, but it wouldn't add too much to efficiency. The upload_in_progress
flag is shared between tasks, like the list itself.