Search code examples
c#asynchronousasync-awaittask-parallel-librarypause

Cooperatively pausing async methods


I am using a Task for a long-running asynchronous processing operation that I want to be able to pause and resume at arbitrary moments. Luckily for me one of the TPL's own authors at Microsoft already came up with a solution to this problem. The only trouble is his solution doesn't work properly.

When you take out the await Task.Delay(100) in the code below, the code will stop honoring pause requests after the very first one. It appears the SomeMethodAsync code resumes execution on the same thread as the other task if the value of Thread.CurrentThread.ManagedThreadId is to be believed. Also the output of SomeMethodAsync suggests that it is running on several threads.

I have always found the TPL rather confusing and difficult to work with and async/await even more so, so I am having a hard time understanding what's even going on here. I'd be very grateful if anyone could explain.

Minimalist example code:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace PauseTokenTest {
    class Program {
        static void Main() {
            var pts = new PauseTokenSource();
            Task.Run(() =>
            {
                while (true) {
                    Console.ReadLine();
                    Console.WriteLine(
                        $"{Thread.CurrentThread.ManagedThreadId}: Pausing task");
                    pts.IsPaused = !pts.IsPaused;
                }
            });
            SomeMethodAsync(pts.Token).Wait();
        }

        public static async Task SomeMethodAsync(PauseToken pause) {
            for (int i = 0; ; i++) {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: {i}");

                // Comment this out and repeatedly pausing and resuming will no longer work.
                await Task.Delay(100);

                await pause.WaitWhilePausedAsync();
            }
        }
    }

    public class PauseTokenSource {
        internal static readonly Task s_completedTask =
            Task.FromResult(true);
        volatile TaskCompletionSource<bool> m_paused;

        public bool IsPaused {
            get { return m_paused != null; }
            set {
                if (value) {
                    Interlocked.CompareExchange(
                        ref m_paused, new TaskCompletionSource<bool>(), null);
                } else {
                    while (true) {
                        var tcs = m_paused;
                        if (tcs == null) return;
                        if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs) {
                            tcs.SetResult(true);
                            break;
                        }
                    }
                }
            }
        }

        public PauseToken Token { get { return new PauseToken(this); } }

        internal Task WaitWhilePausedAsync() {
            var cur = m_paused;
            return cur != null ? cur.Task : s_completedTask;
        }
    }

    public struct PauseToken {
        readonly PauseTokenSource m_source;
        internal PauseToken(PauseTokenSource source) {
            m_source = source;
        }

        public bool IsPaused {
            get { return m_source != null && m_source.IsPaused; }
        }

        public Task WaitWhilePausedAsync() {
            return IsPaused ? m_source.WaitWhilePausedAsync() :
                PauseTokenSource.s_completedTask;
        }
    }
}

Solution

  • This happens because of the way task continuations are executed. In many places in the framework, task continuations are executed synchronously when possible. It's not always possible, but one place where it does become common is on thread pool threads, which are often treated as though they're exchangeable.

    await is one of those places where synchronous continuations are used; this is not officially documented anywhere AFAIK but I describe the behavior on my blog.

    So, what is essentially happening is this: SomeMethodAsync is paused (awaiting the task returned from WaitWhilePausedAsync - without a context), which attaches the rest of SomeMethodAsync as a continuation on that task. Then when the Task.Run (thread pool) thread toggles IsPaused, it completes the task that was returned from WaitWhilePausedAsync. The runtime then looks at the context associated with that continuation (none), so that continuation is considered compatible with any thread pool thread. And hey, the current thread is a thread pool thread! So it just runs the continuation directly.

    The fun part about that is that without another await, the SomeMethodAsync method becomes entirely synchronous: it always retrieves an already-completed task from WaitWhilePausedAsync, so it just continues executing its infinite loop forever. As a reminder, await behaves synchronously with already-completed tasks (as I describe on my blog). This infinite loop is running within the IsPaused setter, so the Task.Run code never continues its loop to call ReadLine again.

    If SomeMethodAsync does have an await, then it can behave asynchronously, returning back to its caller (completing the continuation), and allowing the Task.Run loop to continue executing.

    As suggested by Theodor Zoulias, passing the TaskCreationOptions.RunContinuationsAsynchronously flag to the TaskCompletionSource<bool> will also work. In that case, the task continuation is run asynchronously (on a separate thread pool thread), rather than executed directly on the calling thread.

    IIRC, the blog post you referenced predates the RunContinuationsAsynchronously flag. It also predates the (non-generic) TaskCompletionSource.

    Also as suggested by Theodor Zoulias, I have a PauseTokenSource in my Nito.AsyncEx library that avoids this issue. It also uses RunContinuationsAsynchronously.