Search code examples
rxjsrxjs5rxjs6

Adding to previous result, Observable pipeline only running once


live example

I've an Array of Filters as an Observable and I'd like to add/remove filters from it. Here is the code I have that is currently only adding a Filter the first time the function runs.

The second time nothing happens.

private _filters$ = new BehaviorSubject<Filter[]>([]);

addFilter(added: Filter) {
    debugger
    // adding to array of filters
    this._filters$.pipe(
        tap(d => { debugger; }),
        first(), 
        map(filters => ([...filters, added]))
    ).subscribe(this._filters$);
}

So my question is: why does this happen ? Why does it run only once ? (By the way first() is not the reason).

I know I can make the code work like so:

private _filters$ = new BehaviorSubject<Filter[]>([]);

currentFilters;

init() {
   this._filters$.subscribe(f => this.currentFilters = f);
}

addFilter(added: Filter) {
    this._filters$.next([...this.currentFilters, added]);
}

Solution

  • Actually, it is because of first. When you run the function the first time it is creating the stream and subscribing to the BehaviorSubject. When it receives the first event it forwards it to BehaviorSubject and then it completes BehaviorSubject. The second time you run it BehaviorSubject is already shutdown so it immediately unsubscribes any new subscriptions to it.

    Without knowing too much about your actual goal my suggestion is that instead of putting the BehaviorSubject at the bottom of the pipeline you instead put it at the top.

    // You don't actually need the caching behavior yet so just use a `Subject`
    private _filters$ = new Subject<Filter>()
    
    // Hook this up to whatever is going to be using these filters
    private _pipeline$ = this._filters.pipe(
      // Use scan instead mapping back into self
      scan((filters, newFilter) => ([...filters, newFilter]), []),
      // Store the latest value for new subscribers
      shareReplay(1)
    );
    
    // Now this method is just pushing into the `Subject` and the pipeline never has to be torn down
    addFilter(added: Filter) {
        debugger
        this._filters$.next(added);
    }