Search code examples
rxjsobservablesubjectunsubscribe

rxjs - stop observable with a subject subscription


I created an Observable generating potentially infinite amount of data (e.g. a timer). This data is accessed through a subject, so multiple observers would receive the same values.

How to stop the Observable generating new values? (without modifying the implementation of the Observable)

// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
  console.log("observable init");

  var counter = 0;
  const intervalId = setInterval(() => {
    ++counter;
    console.log("observable %s",counter);
    subscriber.next(counter);
  }, 1000);

  return () => {
    console.log("observable teardown");
    clearTimeout(intervalId);
  }
});

// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);

const subscription = subject$.subscribe(value => console.log("observer %s", value));

// cancel subscription
setTimeout(() => {
  console.log("unsubscribe observer");
  subscription.unsubscribe();
  // TODO how to stop Observable generating new values?
}, 3000);

jsfiddle: https://jsfiddle.net/gy4tfd5w/


Solution

  • Fortunately, there is a dedicated and elegant way to solve this in RxJS.

    Your requirement is to have

    multiple observers [...] receive the same values

    That's called a multicast observable and there are certain operators that are used to create one from an ordinary "cold" observable.

    For example, instead of creating an instance of Subject directly, you can just pipe the source observable to the share operator and it will create the Subject for you. The documentation of share reads:

    Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.

    The last sentence shows the subtle difference between share and source$.subscribe(subject). With share a so-called refCount is kept that unsubscribes the Subject from its source automatically when there are no subscribers to it left.

    Applied to your code, it looks like this:

    const timer$ = 
        new rxjs.Observable(subscriber => {// your unchanged implementation})
        .pipe(
            share()
        );
    
    const subscription = timer$.subscribe(value => console.log("observer %s", value));
    

    Here is the full version with your example code:

    https://jsfiddle.net/50q746ad/

    By the way, share is not the only operator that performs multicasting. There are great learning resources that go much more in depth on that topic.