I created an Observable generating potentially infinite amount of data (e.g. a timer). This data is accessed through a subject, so multiple observers would receive the same values.
How to stop the Observable generating new values? (without modifying the implementation of the Observable)
// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
console.log("observable init");
var counter = 0;
const intervalId = setInterval(() => {
++counter;
console.log("observable %s",counter);
subscriber.next(counter);
}, 1000);
return () => {
console.log("observable teardown");
clearTimeout(intervalId);
}
});
// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);
const subscription = subject$.subscribe(value => console.log("observer %s", value));
// cancel subscription
setTimeout(() => {
console.log("unsubscribe observer");
subscription.unsubscribe();
// TODO how to stop Observable generating new values?
}, 3000);
jsfiddle: https://jsfiddle.net/gy4tfd5w/
Fortunately, there is a dedicated and elegant way to solve this in RxJS.
Your requirement is to have
multiple observers [...] receive the same values
That's called a multicast observable and there are certain operators that are used to create one from an ordinary "cold" observable.
For example, instead of creating an instance of Subject
directly, you can just pipe the source observable to the share
operator and it will create the Subject
for you. The documentation of share
reads:
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.
The last sentence shows the subtle difference between share
and source$.subscribe(subject)
. With share
a so-called refCount
is kept that unsubscribes the Subject
from its source automatically when there are no subscribers to it left.
Applied to your code, it looks like this:
const timer$ =
new rxjs.Observable(subscriber => {// your unchanged implementation})
.pipe(
share()
);
const subscription = timer$.subscribe(value => console.log("observer %s", value));
Here is the full version with your example code:
https://jsfiddle.net/50q746ad/
By the way, share
is not the only operator that performs multicasting. There are great learning resources that go much more in depth on that topic.