Search code examples
c#.netconcurrencyreactive-programmingsystem.reactive

Offloading long blocking calls to separate threads using System.Observable


I want to execute a blocking method into separate threads. To illustrate the situation, I created this example:

int LongBlockingCall(int n)
{
    Thread.Sleep(1000);
    return n + 1;
}

var observable = Observable
.Range(0, 10)
    .SelectMany(n => 
        Observable.Defer(() => Observable.Return(LongBlockingCall(n), NewThreadScheduler.Default)));
    
var results = await observable.ToList();

I expected this to run in ~1 second, but it takes ~10 seconds instead. A new thread should be spawned for each call, because I'm specifying NewThreadScheduler.Default, right?

What am I doing wrong?


Solution

  • You have to replace this line:

    Observable.Return(LongBlockingCall(n), NewThreadScheduler.Default)
    

    ...with this:

    Observable.Start(() => LongBlockingCall(n), NewThreadScheduler.Default)
    

    The Observable.Return just returns a value. It's not aware how this value is produced. In order to invoke an action on a specific scheduler, you need the Observable.Start method.