Search code examples
rxjsobservablereactive-programming

Use takeWhile operator with different Observable source as a flag


I want to unsubscribe from an observable by using another observable source.

stop$: Subject<boolean> = new Subject<boolean>();
source: Subject<string> = new Subject<string>();
this.source.pipe(takeWhile(this.stop$ === false)).subscribe()

Can I do that with takeWhile operator ?


Solution

  • takeWhile operator emits values until provided expression is false. Predicate function should return true or a false value, so you cannot use a subject here (although you can do some modifications and use BehaviourSubject and similar but its not recommended)

    EDITED:

    Better solution would be to use takeUntil operator in a way like:

    this.source
      .pipe(
        takeUntil(
          this.stop$.pipe(filter(val => !val))
        )
      )
      .subscribe()
    

    Or you can extract an Observable:

    unsubscribe$: Observable<boolean> = this.stop$.pipe(filter(val => !val));
    
    this.source
      .pipe(takeUntil(this.unsubscribe$))
      .subscribe()