Search code examples
c#.netparallel-processingtask-parallel-librarystream-processing

Good way to parallel process elements of 'stream' while keeping output in order


I have an app that receives a stream of XML events from Kafka. These events have to be deserialized/parsed and otherwise converted, before being handed in-order to some business logic. (This logic then on emits other events on the output side).

The parsing/conversion code is stateless, while the domain code is stateful and has to receive events in order. These two steps are decoupled through use of a System.Threading Channel so that parsing step gets a full 'thread'/'cpu' (async task).

My challenge is that the parsing is CPU heavy, and it hits a 100% CPU on one core thereby being the bottleneck for service throughput. I've tried to use multi-threading / parallel processing, and this has improved the throughput somewhat. However my approach seems un-elegant, and potentially with a lot of overhead.

In the parsing step I've used Task.Run() to spawn a Task for each 'item', and then added the Task to the output queue ensuring the Tasks are added according to input order. The consumer then pulls tasks from the Channel one at a time, and waits for it to complete with a result before continuing.

This means I'm creating and submitting a large amount of Tasks, and in general seems like I'm using a lot of thread coordinating operations in the hot-path.

Was hoping someone here would have a good approach for processing items in order while respecting the ordering of the output.


Solution

  • So you have a Channel<Task<T>> as a conveyor belt, the producer adds tasks with channel.Writer.TryWrite(Task.Run(() => Parse(item))), and the consumer reads the tasks and awaits them the one after the other:

    await foreach (Task<T> task in channel.Reader.ReadAllAsync())
    {
        T result = await task;
        // Do something with the result
    }
    

    This is a quite good setup. A disadvantage is that you are not controlling the degree of parallelism. So at some moments you might have too many Task.Run actions running in parallel, resulting in ThreadPool starvation, that might affect negatively other parts of your application. You can solve this problem by scheduling the work with the more advanced Task.Factory.StartNew instead of the Task.Run, and configure the scheduler argument with the ConcurrentScheduler property of a shared ConcurrentExclusiveSchedulerPair instance.

    Another approach is to replace the channel with a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component combines an input buffer, an output buffer, and a processor that transforms the TInput to TOutput. It is equipped out of the box with parallel capabilities and order preservation. Here is an example:

    TransformBlock<Item, Result> block = new(item =>
    {
        return Parse(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2, // Configurable, the default is 1
        EnsureOrdered = true, // This is the default
    });
    

    The producer feeds the block with block.Post(item), and the consumer enumerates the output buffer of the block with the ReceiveAllAsync method:

    await foreach (var result in block.ReceiveAllAsync())
    {
        // Do something with the result
    }
    await block.Completion;
    

    The await block.Completion; at the end is needed because the ReceiveAllAsync method currently has a bug, and doesn't propagate possible exceptions as part of the enumeration.

    My expectation is that the TransformBlock approach should have less overhead, and consume less memory than your current setup. The TPL Dataflow library is advertised by Microsoft as suitable for "coarse-grained dataflow and pipelining tasks". This means that your Parse method should be chunky. In case it is feather-weight, like parsing a single number, most likely the benefits of parallelization will be negated by the synchronization overhead. In that case the solution might be to chunkify the work using a BatchBlock<T>.

    The TPL Dataflow library is not exactly cutting edge technology. It predates ValueTasks and so it doesn't take advantage of them. It also comes with some quirks, like swallowing OperationCanceledExceptions that might be thrown by the transform delegate. It is also very difficult to extend. Although it should be better than what you have already, it's not the absolutely optimal solution, but it might be good enough for your needs.