Search code examples
angularrxjsobservablebehaviorsubjectsubject

How to merge multiple subjects into observable and emit on subscribe


I would like to merge multiple subjects (fooSubject$, barSubject$) into one observable (filterChanged$) and return the (initial) values (_foo, _bar), when I subscribe to merged observable (filterChanged$) and when one of the values change.

I already used BehaviorSubject (for both subjects) but then filterChanged$ will emit twice on subscribe. How can I immediately return the values (_foo, _bar) when subscribe to observable filterChanged$ without using BehaviorSubject?

export interface IFilterData {
    foo: string;
    bar: string;
}

@Injectable({
    providedIn: 'root'
})
export class FilterService {
    public foo$: Observable<string>;
    public bar$: Observable<string>;
    public filterChanged$: Observable<IFilterData>;

    private fooSubject$: Subject<string>;
    private barSubject$: Subject<string>;
    private _foo: string;
    private _bar: string;

    constructor() {
        this._foo = 'InitialFoo';
        this._bar = 'InitialBar';

        this.foo$ = this.fooSubject$.asObservable();
        this.bar$ = this.barSubject$.asObservable();

        this.filterChanged$ = merge(this.fooSubject$, this.barSubject$).pipe(
            switchMap(() => {
                return of({
                    foo: this._foo,
                    bar: this._bar
                });
            })
        );
    }

    set foo(value: string) {
        this._foo = value;
        this.fooSubject$.next(value);
    }
    get foo(): string {
        return this._foo;
    }

    set bar(value: string) {
        this._bar = value;
        this.barSubject$.next(value);
    }
    get bar(): string {
        return this._bar;
    }
}

Solution

  • I already used BehaviorSubject (for both subjects) but then filterChanged$ will emit twice on subscribe.

    This happens because, as you might now, when a BehaviorSubject is subscribed, it will emit its latest values synchronously to the new subscriber.

    How can I immediately return the values (_foo, _bar) when subscribe to observable filterChanged$

    There is a way to achieve this, but note that, with this approach, the BehaviorSubjects won't store these _foo and _bar values.(there is a way to store them as well)

    combineLatest(
      this.fooSubject$.pipe(skip(1), startWith(this._foo)),
      this.barSubject$.pipe(skip(1), startWith(this._bar)),
    ),.subscribe(/* ... */)
    

    this will wait until each observable emits once, then will emit when one of those 2 observables emit.

    skip(1) is used because we don't want the values that are currently stored by the Subjects, but the _foo & _bar values.