Search code examples
c#multithreadingsystem.reactiverx.net

How to make the Rx callbacks run on the ThreadPool?


Would you expect that the program below prints False?

using System;
using System.Threading;
using System.Reactive.Linq;
using System.Reactive.Concurrency;

public static class Program
{
    public static void Main()
    {
        Observable
            .Return(1)
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Do(x => Console.WriteLine(Thread.CurrentThread.IsThreadPoolThread))
            .Wait();
    }
}

Output:

False

My undestanding was that the ThreadPoolScheduler is intended for scheduling work on the ThreadPool, but apparently this is not what happening. Its name probably refers to some other thread pool, internal to Rx, and not to the actual ThreadPool class in the System.Threading namespace.

I made various attempts to force the callback to run on the ThreadPool, but with no success. Some things I've tried:

.ObserveOn(Scheduler.Default)
.ObserveOn(DefaultScheduler.Instance)
.ObserveOn(TaskPoolScheduler.Default)
.ObserveOn(new TaskPoolScheduler(Task.Factory))
.ObserveOn(new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)))
ThreadPoolScheduler.Instance.DisableOptimizations(); // At the start of the program

The last attempt above was after reading this question in Microsoft's forums. No matter what I've tried, the Thread.IsThreadPoolThread property keeps returning false.

Am I supposed to wrote my own IScheduler implementation to ensure that my code runs on the ThreadPool? It sounds like a quite significant undertaking for such a trivial goal.

.NET 5.0.1, System.Reactive 5.0.0, C# 9


Solution

  • The link you shared actually has a solution:

    var modScheduler = ThreadPoolScheduler.Instance.DisableOptimizations(new[] { typeof(ISchedulerLongRunning) });
    Observable
        .Return(1)
        .ObserveOn(modScheduler)
        .Select(_ => Thread.CurrentThread)
        .Subscribe(t => Console.WriteLine(t.IsThreadPoolThread));
    

    If you look at the source of ThreadPoolScheduler, you'll see that all the work gets sent to the ThreadPool, except for ScheduleLongRunning. If you disable that optimization, then all work will get sent to the ThreadPool.