Search code examples
rxjs

Is it necessary to unsubscribe from a combineLatest within a forkJoin?


According to the RXJS documentation itself, from what I understood, the forkJoin, when finishing all the calls, it gives completed(), which automatically unsubscribes, with this it is not necessary to manually unsubscribe.

But what if I have a combineLatest that is signing something, like this:

 forkJoin({
      xSubject: this.serviceX.on(),
      ySubject: this.serviceY.on(),
      zSubject: this.serviceZ.on(),
      wSubject: this.serviceW.on()
    })
    .pipe(
      switchMap(({ xSubject, ySubject, zSubject, wSubject }) => {
        return combineLatest([xSubject, ySubject, zSubject, wSubject]);
      })
    )
    .subscribe(([x, y, z, w]) => {
      console.log(x)
      console.log(y)
      console.log(z)
      console.log(w)
    });

Is it necessary to unsubscribe from it? What is the best way in this case, if necessary?


Solution

  • As a rule of thumb, it's always better to close subscriptions, even for functions like forkJoin and Angular HTTP Observables that emits once and completes. There are multiple factors that could leave the subscriptions open.

    In your case specifically, the inner observable is not closed automatically. So you'd need to close it. Notice in the following snippet, the complete is only logged after the close observable has emitted, which inturn manually closes the subscription using the takeUntil opeartor.

    const { of, forkJoin, combineLatest, Subject } = rxjs;
    const { switchMap, takeUntil } = rxjs.operators;
    
    const obsX = new Subject();
    const obsY = new Subject();
    const obsZ = new Subject();
    const obsW = new Subject();
    const close = new Subject();
    
    forkJoin({
      xSubject: of(obsX),
      ySubject: of(obsY),
      zSubject: of(obsZ),
      wSubject: of(obsW),
    })
      .pipe(
        switchMap(({ xSubject, ySubject, zSubject, wSubject }) => {
          return combineLatest([xSubject, ySubject, zSubject, wSubject]);
        }),
        takeUntil(close)
      )
      .subscribe({
        next: ([x, y, z, w]) => {
          console.log(x);
          console.log(y);
          console.log(z);
          console.log(w);
        },
        complete: () => console.log('complete'),
      });
    
    setTimeout(() => obsX.next('x value'), 1000);
    setTimeout(() => obsY.next('y value'), 2000);
    setTimeout(() => obsZ.next('z value'), 3000);
    setTimeout(() => obsW.next('w value'), 4000);
    
    setTimeout(() => close.next(), 6000);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>