Search code examples
javascriptfunctional-programmingrxjsfrprxjs5

forEach operator being evaluated without subscription


I'm trying to teach myself some reactive functional programming. This video from Ben Lesh is showing an example of an observable. My prior reading indicated that an observable is lazy, i.e., it only evaluates after being subscribed to. Strangely enough, this code doesn't require subscription to print to the console.

var Rx = require('rxjs/Rx')

var source = Rx.Observable.from([1,2,3,4,5]);

var newSource = source.filter(x => x % 2 === 1)
                .map(x => x + '!')
                .forEach(x => console.log(x));

From the RxJS docs:

It seems as though the Observable must be actively resolving the promises emitted by .forEach, I am so confused by this.

Further confusion stems from this code:

var Rx = require('rxjs/Rx')

var source = Rx.Observable.from([1,2,3,4,5]);

var newSource = source.filter(x => x % 2 === 1)
      .map(x => x + '!')
      .do(x => console.log(x));

Which does not evaluate until running newSource.subscribe();, please help me out to explain the difference behind the two operators here.


Solution

  • Observables are lazy by default. If you perform an operator on an observable, under the hood, rxjs will create a new observable for you that is linked to the previous one. Know that observables are immutable.

    However, ForEach is a special kind of operator. It does not return a new Observable but it will subscribe onto the observable under the hood and perform a function on every element emitted by that observable. If you check the source code of the forEach implementation, which is on the Observable class itself you will see the following (just a snippet).

    const subscription = this.subscribe((value) => {
        if (subscription) {
          // if there is a subscription, then we can surmise
          // the next handling is asynchronous. Any errors thrown
          // need to be rejected explicitly and unsubscribe must be
          // called manually
          try {
            next(value);
          } catch (err) {
            reject(err);
            subscription.unsubscribe();
          }
    

    Here we can see the observable is being subscribed to and the value is 'next'-ed. This next function is the function you pass to the forEach call.