Search code examples
c#task-parallel-librarytpl-dataflow

How to restore the order of a shuffled Dataflow pipeline?


I have a Dataflow pipeline that consists of multiple blocks that are processing heterogeneous documents (XLS, PDF etc). Each type of document is processed by a dedicated TransformBlock. At the end of the pipeline I have an ActionBlock that receives all the processed documents, and uploads them one by one to a web server. My problem is that I can't find a way to satisfy the requirement of uploading the documents in the same order they were initially entered in the pipeline. For example I can't use the EnsureOrdered option to my advantage, because this option configures the behavior of a single block, and not the behavior of multiple blocks that are working in parallel. My requirements are:

  1. Insert the documents in the pipeline in a specific order.
  2. Process each document differently, depending on its type.
  3. The documents of a specific type should be processed sequentially.
  4. Documents of different types can (and should) be processed in parallel.
  5. All documents should be uploaded ASAP after they are processed.
  6. The documents must be uploaded sequentially, and in the same order they were entered in the pipeline.

For example it is required that the document #8 must be uploaded after the document #7, even if it is processed before the document #7.

The fifth requirement means that I can't wait for all documents to be processed, then sort them by index, and finally upload them. The uploading must happen concurrently with the processing.

Here is a minimal example of what I'm trying to do. For simplicity I am not feeding the blocks with instances of the IDocument interface, but with simple integers. The value of each integer represents the order in which it was entered in the pipeline, and the order in which it must be uploaded:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => uploader.Complete());

await uploader.Completion;

The output is:

Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10

(Try it on Fiddle)

The desirable order is #1, #2, #3, #4, #5, #6, #7, #8, #9, #10.

How can I restore the order of the processed documents, before sending them to the uploader block?

Clarification: Changing drastically the schema of the pipeline, by replacing the multiple specific TransformBlocks with a single generic TransformBlock, is not an option. The ideal scenario would be to intercept a single block between the processors and the uploader, that will restore the order of the documents.


Solution

  • uploader should add the document into some sorted list of completed documents, and check if the added document is the one that it should be uploaded next. If it should then remove and upload all the documents from the sorted list until there is one missing.

    There is also a synchronization problem. Access to this sorted list must be synchronized across threads. But you want all threads to be doing something instead of waiting for other threads to complete their work. So, uploader should work with the list like this:

    • Within sync lock add new document to the list, and release lock
    • In a loop
      • enter the same sync lock again,
      • if upload_in_progress flag is set then do nothing and return.
      • check if document on top of the list should be uploaded,
        • if not then reset upload_in_progress flag, and return.
        • otherwise remove the document from the list,
        • set upload_in_progress flag,
        • release lock,
        • upload the document.

    I hope I imagined it right. As you can see it's tricky do make it both safe and efficient. There's surely a way to do it with only one lock in most of the cases, but it wouldn't add too much to efficiency. The upload_in_progress flag is shared between tasks, like the list itself.