Search code examples
c#asynchronoustasktask-parallel-librarymethod-chaining

Chaining arbitrary number of tasks together in C#.NET


What I have

I have a set of asynchronous processing methods, similar to:

public class AsyncProcessor<T>
{
    //...rest of members, etc.

    public Task Process(T input)
    {
        //Some special processing, most likely inside a Task, so
        //maybe spawn a new Task, etc.
        Task task = Task.Run(/* maybe private method that does the processing*/);
        return task;
    }
}

What I want

I would like to chain them all together, to execute in sequential order.

What I tried

I have tried to do the following:

public class CompositeAsyncProcessor<T>
{
    private readonly IEnumerable<AsyncProcessor<T>> m_processors;

    //Constructor receives the IEnumerable<AsyncProcessor<T>> and
    //stores it in the field above.

    public Task ProcessInput(T input)
    {
        Task chainedTask = Task.CompletedTask;

        foreach (AsyncProcessor<T> processor in m_processors)
        {
            chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
        }

        return chainedTask;
    }
}

What went wrong

However, tasks do not run in order because, from what I have understood, inside the call to ContinueWith, the processor.Process(input) call is performed immediately and the method returns independently of the status of the returned task. Therefore, all processing Tasks still begin almost simultaneously.

My question

My question is whether there is something elegant that I can do to chain the tasks in order (i.e. without execution overlap). Could I achieve this using the following statement, (I am struggling a bit with the details), for example?

chainedTask = chainedTask.ContinueWith(async t => await processor.Process(input));

Also, how would I do this without using async/await, only ContinueWith?

Why would I want to do this?

Because my Processor objects have access to, and request things from "thread-unsafe" resources. Also, I cannot just await all the methods because I have no idea about how many they are, so I cannot just write down the necessary lines of code.

What do I mean by thread-unsafe? A specific problem

Because I may be using the term incorrectly, an illustration is a bit better to explain this bit. Among the "resources" used by my Processor objects, all of them have access to an object such as the following:

public interface IRepository
{
    void Add(object obj);

    bool Remove(object obj);

    IEnumerable<object> Items { get; }
}

The implementation currently used is relatively naive. So some Processor objects add things, while others retrieve the Items for inspection. Naturally, one of the exceptions I get all too often is:

InvalidOperationException: Collection was modified, enumeration operation may not execute.

I could spend some time locking access and pre-running the enumerations. However, this was the second option I would get down to, while my first thought was to just make the processes run sequentially.

Why must I use Tasks?

While I have full control in this case, I could say that for the purposes of the question, I might not be able to change the base implementation, so what would happen if I were stuck with Tasks? Furthermore, the operations actually do represent relatively time-consuming CPU-bound operations plus I am trying to achieve a responsive user interface so I needed to unload some burden to asynchronous operations. While being useful and, in most of my use-cases, not having the necessity to chain multiple of them, rather a single one each time (or a couple, but always specific and of a specific count, so I was able to hook them together without iterations and async/await), one of the use-cases finally necessitated chaining an unknown number of Tasks together.

How I deal with this currently

The way I am dealing with this currently is to append a call to Wait() inside the ContinueWith call, i.e.:

foreach (AsyncProcessor<T> processor in m_processors)
{
    chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}

I would appreciate any idea on how I should do this, or how I could do it more elegantly (or, "async-properly", so to speak). Also, I would like to know how I can do this without async/await.

Why my question is different from this question, which did not answer my question entirely.

Because the linked question has two tasks, so the solution is to simply write the two lines required, while I have an arbitrary (and unknown) number of tasks, so I need an suitable iteration. Also, my method is not async. I now understand (from the single briefly available answer, which was deleted) that I could do it fairly easily if I changed my method to async and await each processor's Task method, but I still wish to know how this could be achieved without async/await syntax.

Why my question is not a duplicate of the other linked questions

Because none of them explains how to chain correctly using ContinueWith and I am interested in a solution that utilizes ContinueWith and does not make use of the async/await pattern. I know this pattern may be the preferable solution, I want to understand how to (if possible) make arbitrary chaining using ContinueWith calls properly. I now know I don't need ContinueWith. The question is, how do I do it with ContinueWith?


Solution

  • The method Task.ContinueWith does not understand async delegates, like Task.Run do, so when you return a Task it considers this as a normal return value and wraps it in another Task. So you end up receiving a Task<Task> instead of what you expected to get. The problem would be obvious if the AsyncProcessor.Process was returning a generic Task<T>. In this case you would get a compile error because of the illegal casting from Task<Task<T>> to Task<T>. In your case you cast from Task<Task> to Task, which is legal, since Task<TResult> derives from Task.

    Solving the problem is easy. You just need to unwrap the Task<Task> to a simple Task, and there is a built-in method Unwrap that does exactly that.

    There is another problem that you need to solve though. Currently your code suppresses all exceptions that may occur on each individual AsyncProcessor.Process, which I don't think it was intended. So you must decide which strategy to follow in this case. Are you going to propagate the first exception immediately, or you prefer to cache them all and propagate them at the end bundled in an AggregateException, like the Task.WhenAll does? The example bellow implements the first strategy.

    public class CompositeAsyncProcessor<T>
    {
        //...
        public Task Process(T input)
        {
            Task current = Task.CompletedTask;
            foreach (AsyncProcessor<T> processor in m_processors)
            {
                current = current.ContinueWith(antecessor =>
                {
                    if (antecessor.IsFaulted)
                        return Task.FromException<T>(antecessor.Exception.InnerException);
                    return processor.Process(input);
                },
                    CancellationToken.None,
                    TaskContinuationOptions.ExecuteSynchronously,
                    TaskScheduler.Default
                ).Unwrap();
            }
            return current;
        }
    }
    

    I have used an overload of ContinueWith that allows configuring all the options, because the defaults are not ideal. The default TaskContinuationOptions is None. Configuring it to ExecuteSynchronously you minimize the thread switches, since each continuation will run in the same thread that completed the previous one.

    The default task scheduler is TaskScheduler.Current. By specifying TaskScheduler.Default you make it explicit that you want the continuations to run in thread-pool threads (for some exceptional cases that won't be able to run synchronously). The TaskScheduler.Current is context specific, and if it ever surprises you it won't be in a good way.

    As you see there are a lot of gotchas with the old-school ContinueWith approach. Using the modern await in a loop is a lot easier to implement, and a lot more difficult to get it wrong.