According to the RXJS documentation itself, from what I understood, the forkJoin, when finishing all the calls, it gives completed()
, which automatically unsubscribes, with this it is not necessary to manually unsubscribe.
But what if I have a combineLatest that is signing something, like this:
forkJoin({
xSubject: this.serviceX.on(),
ySubject: this.serviceY.on(),
zSubject: this.serviceZ.on(),
wSubject: this.serviceW.on()
})
.pipe(
switchMap(({ xSubject, ySubject, zSubject, wSubject }) => {
return combineLatest([xSubject, ySubject, zSubject, wSubject]);
})
)
.subscribe(([x, y, z, w]) => {
console.log(x)
console.log(y)
console.log(z)
console.log(w)
});
Is it necessary to unsubscribe from it? What is the best way in this case, if necessary?
As a rule of thumb, it's always better to close subscriptions, even for functions like forkJoin
and Angular HTTP Observables that emits once and completes. There are multiple factors that could leave the subscriptions open.
In your case specifically, the inner observable is not closed automatically. So you'd need to close it. Notice in the following snippet, the complete
is only logged after the close
observable has emitted, which inturn manually closes the subscription using the takeUntil
opeartor.
const { of, forkJoin, combineLatest, Subject } = rxjs;
const { switchMap, takeUntil } = rxjs.operators;
const obsX = new Subject();
const obsY = new Subject();
const obsZ = new Subject();
const obsW = new Subject();
const close = new Subject();
forkJoin({
xSubject: of(obsX),
ySubject: of(obsY),
zSubject: of(obsZ),
wSubject: of(obsW),
})
.pipe(
switchMap(({ xSubject, ySubject, zSubject, wSubject }) => {
return combineLatest([xSubject, ySubject, zSubject, wSubject]);
}),
takeUntil(close)
)
.subscribe({
next: ([x, y, z, w]) => {
console.log(x);
console.log(y);
console.log(z);
console.log(w);
},
complete: () => console.log('complete'),
});
setTimeout(() => obsX.next('x value'), 1000);
setTimeout(() => obsY.next('y value'), 2000);
setTimeout(() => obsZ.next('z value'), 3000);
setTimeout(() => obsW.next('w value'), 4000);
setTimeout(() => close.next(), 6000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>