Search code examples
javascriptasynchronoussystem.reactivereactive-extensions-jsrxjs

Synchronicity in RxJS


I would expect that the following code would run asynchronously:

var range = Rx.Observable.range(0, 3000000);

range.subscribe(
  function(x) {},
  function(err) {},
  function() {
    console.log('Completed');
});

console.log('Hello World');

But that's not the case. It takes a while to go through the big range of numbers and only when it is completed the execution is resumed, you can try the code here.

I am confused as to when to expect RxJS to behave synchronously or asynchronously. Does it depend on the method used? My previous idea was that once we are in Observables/Observer land, everything in it runs asynchronously, similar to how promises work.


Solution

  • RxJs follows the same rules as Rx.Net. By default, each observable operator uses the minimum amount of asynchronicity needed to do its work. In this case, Range can run through the numbers synchronously and so it does (its documentation tells you it will use the Rx.Scheduler.currentThread by default.

    If you want to introduce more asynchronicity than is needed for an operation, you need to tell it to use a different Scheduler.

    To get the behavior you were expecting, you want to use Rx.Scheduler.timeout. This will, in essence, cause it to schedule each iteration via setTimeout. (In actuality it is not this simple, the scheduler will use the fastest method available in the browser to schedule deferred work).

    var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);
    

    updated jsFiddle

    Note that iterating through 3 million numbers via setTimeout will take nearly forever. So maybe we want to process them in batches of 1,000. So here we will take advantage of the default behavior of Range to run synchronously, then batch the values and use observeOn to run the batches via our timeout scheduler:

    var range = Rx.Observable
        .range(0, 3000000)
        .bufferWithCount(1000)
        .observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
        .select(function (buffer, i) {
           console.log("processing buffer", i);
           return Rx.Observable.fromArray(buffer);
         })
        .concatAll(); // concat the buffers together
    

    jsFiddle Note that there is a delay at the beginning while range cranks through all 3,000,000 values and bufferWithCount produces 3,000 arrays. This sort of stuff is unusual for real production code where your data source is not as trivial as Observable.range.

    FYI promises are not any different in this respect. If you call then on a promise that is already completed, that then function might run synchronously. All Promises and Observables really do is present an interface by which you can provide callbacks that are guaranteed to run when the condition is met, whether the condition is already met or will be met later. RxJs then provides many many mechanisms to force something to be run asynchronously if you really want it that way. And methods to introduce specific timings.