Search code examples
javascriptrxjsreactivex

In RxJS, does Observer get injected into Observable execution?


I have read through the ReactiveX documentation several times, and still cannot wrap my head around exactly what happens when an Observer subscribes to an Observable.

Let's look at a simple example:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.complete();
});

const observer = {
  next: (x) => console.log('got value ' + x),
  error: (err) => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done')
};

observable.subscribe(observer);

StackBlitz code.


My Question:

Where does the subscriber object that is passed to the Observable come from?

From the RxJS documentation:

It is not a coincidence that observable.subscribe and subscribe in new Observable(function subscribe(subscriber) {...}) have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.

So, apparently the object passed into the subscribe callback in the Observable constructor (subscriber) is not actually the observer object. At least not if you go by the quote above on how the library actually works.

If it is not the observer object that is passed in, then what exactly is subscriber.next(1) and subscribe.complete() calling? How does that connect to the next property in the observer?


Clarifying Edit:

I know how to utilize RxJS and indeed that one can conceptually imagine that the Observer is injected (as the quote says). However, I am here looking to understand how it actually works.


Solution

  • The Observable creation process flows as follows:

    An Observable is defined by the author (here manually with new, for purposes of the explanation):

    const myObservable = new Observable(function subscribe(subscriber) {
      subscriber.next(1);
      subscriber.next(2);
      subscriber.complete();
      return function tearDownLogic() {
        console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
      }
    });
    

    The subscribe callback passed to the Observable above is saved locally by the Observable constructor:

    constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
      if (subscribe) {
        this._subscribe = subscribe;
      }
    }
    

    So, we have that entire subscribe function, defined by us or any other pre-made Observable, saved down for later execution.

    An Observer can be passed to the subscribe callback in one of several forms. Either, as one to three functions directly (next, error, complete), or as an object with one or more of the same three methods. For purposes of this explanation, we will implement the last and more verbose option:

    const observer = {
      next(v) {
        console.log(v);
      }
      error(err) {
        console.log(err);
      }
      complete() {
        console.log('Observable has now completed and can no longer emit values to observer');
      }
    }
    

    Now, the fun part starts. We pass the observer into the Observable.subscribe(..) method:

    myObserver.subscribe(observer);
    

    The subscribe method looks like this:

      subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
                error?: (error: any) => void,
                complete?: () => void): Subscription {
    
    
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
    
    
        if (operator) {
          sink.add(operator.call(sink, this.source));
        } else {
          sink.add(
            this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
            this._subscribe(sink) :
            this._trySubscribe(sink)
          );
        }
    
    
        if (config.useDeprecatedSynchronousErrorHandling) {
          if (sink.syncErrorThrowable) {
            sink.syncErrorThrowable = false;
            if (sink.syncErrorThrown) {
              throw sink.syncErrorValue;
            }
          }
        }
    
    
        return sink;
      }
    

    Briefly described, the subscribe method:

    1. Receives the observer in one of its previously discussed forms
    2. toSubscriber converts the observer to a Subscriber object, regardless of its passed in form (the Subscriber instance is saved in the sink variable)
    3. Note: The operator variable is undefined, unless you subscribe to an operator. Thus, just ignore the if statements around operator
    4. Subscriber extends (is prototype-linked to) the Subscription object, which has two important methods on its prototype: unsubscribe(), add()
    5. add(..) is used to add "tear down logic" (functions) to the Observable, that will run when an Observable completes or is unsubscribed. It will take any function that is passed to it, wrap it in a Subscription object, and place the function into the Subscription's _unsubscribe variable. This Subscription is saved on the Subscriber we created above, in a variable called _subscriptions. As noted, we do all that so that when the Subscriber is unsubscribed or completes, all the add()'ed tear down logic executes
    6. As a side note, Observable.subscribe() returns the Subscriber instance. Thus, you can call mySubscriber.add( // some tear down logic) on it at any point to add functions that will execute when the Observable completes or is unsubscribed
    7. An important part now enfolds: this._trySubscribe(sink) runs (inside add(), as a parameter). _trySubscribe(..) is the function that actually runs the subscribe callback earlier saved by the Observable constructor. Importantly, it passes in sink (our new Subscriber instance) as the callback to the Observable callback. In other words, when subscriber.next(1) inside the Observable executes, we are actually executing next(1) in the sink (Subscriber) instance (next() is on Subscriber's prototype).

    So, that takes me to the end, for now. There are more details inside toSubscribe and around the unsubscribe process, among other things, but those are outside the scope of this Q&A.

    In short summary, to answer the question in the title, the Observer is indeed passed into the Observable, simply after being converted to a unifying Subscriber object.

    Hopefully, that will help someone else in the future.