I have read through the ReactiveX documentation several times, and still cannot wrap my head around exactly what happens when an Observer subscribes to an Observable.
Let's look at a simple example:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});
const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
observable.subscribe(observer);
My Question:
Where does the subscriber
object that is passed to the Observable come from?
From the RxJS documentation:
It is not a coincidence that
observable.subscribe
andsubscribe
innew Observable(function subscribe(subscriber) {...})
have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.
So, apparently the object passed into the subscribe callback in the Observable constructor (subscriber
) is not actually the observer
object. At least not if you go by the quote above on how the library actually works.
If it is not the observer
object that is passed in, then what exactly is subscriber.next(1)
and subscribe.complete()
calling? How does that connect to the next
property in the observer
?
Clarifying Edit:
I know how to utilize RxJS and indeed that one can conceptually imagine that the Observer is injected (as the quote says). However, I am here looking to understand how it actually works.
The Observable
creation process flows as follows:
An Observable
is defined by the author (here manually with new
, for purposes of the explanation):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
The subscribe
callback passed to the Observable
above is saved locally by the Observable constructor:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
So, we have that entire subscribe
function, defined by us or any other pre-made Observable
, saved down for later execution.
An Observer can be passed to the subscribe
callback in one of several forms. Either, as one to three functions directly (next, error, complete), or as an object with one or more of the same three methods. For purposes of this explanation, we will implement the last and more verbose option:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
Now, the fun part starts. We pass the observer
into the Observable.subscribe(..)
method:
myObserver.subscribe(observer);
The subscribe method looks like this:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
Briefly described, the subscribe
method:
observer
in one of its previously discussed formstoSubscriber
converts the observer to a Subscriber
object, regardless of its passed in form (the Subscriber
instance is saved in the sink
variable)operator
variable is undefined
, unless you subscribe to an operator. Thus, just ignore the if
statements around operator
Subscriber
extends (is prototype-linked to) the Subscription
object, which has two important methods on its prototype: unsubscribe()
, add()
add(..)
is used to add "tear down logic" (functions) to the Observable
, that will run when an Observable
completes or is unsubscribed. It will take any function that is passed to it, wrap it in a Subscription
object, and place the function into the Subscription
's _unsubscribe
variable. This Subscription
is saved on the Subscriber
we created above, in a variable called _subscriptions
. As noted, we do all that so that when the Subscriber
is unsubscribed or completes, all the add()
'ed tear down logic executesObservable.subscribe()
returns the Subscriber
instance. Thus, you can call mySubscriber.add( // some tear down logic)
on it at any point to add functions that will execute when the Observable
completes or is unsubscribedthis._trySubscribe(sink)
runs (inside add()
, as a parameter). _trySubscribe(..)
is the function that actually runs the subscribe
callback earlier saved by the Observable
constructor. Importantly, it passes in sink
(our new Subscriber
instance) as the callback to the Observable
callback. In other words, when subscriber.next(1)
inside the Observable
executes, we are actually executing next(1)
in the sink
(Subscriber
) instance (next()
is on Subscriber
's prototype).So, that takes me to the end, for now. There are more details inside toSubscribe
and around the unsubscribe process, among other things, but those are outside the scope of this Q&A.
In short summary, to answer the question in the title, the Observer is indeed passed into the Observable
, simply after being converted to a unifying Subscriber
object.
Hopefully, that will help someone else in the future.