Search code examples
angularrxjs

Wait for observables to complete before calling next one


I am using RxJS the following way and I get unexpected values order. The following code can be executed in Stackblitz.

EDIT - Simplified code. We could simplify to 2 simple observers to the same behaviorsubject that receive data in different order.

this.state$ = new BehaviorSubject<any>({ a: null, b: null });

const obs1 = this.state$
  .subscribe((value) => {
    console.log('obs 1', value);
    if (value.a == 1) this.state$.next({ a: 2, b: 3 });
  });

const obs2 = this.state$.subscribe((value) => console.log('obs 2', value));

this.state$.next({ a: 1, b: 1 });

This is what the logs look like

obs 1 {a: null, b: null}
obs 2 {a: null, b: null}
obs 1 {a: 1, b: 1}
obs 1 {a: 2, b: 3}
obs 2 {a: 2, b: 3}
obs 2 {a: 1, b: 1}

I would expect the observers of the same behavior subject to receive emitted values in the same order, wherever these values come from.

Previous code

class sampleState {
  a: any;
  b: any;
}

const initialState: sampleState = {
  a: null,
  b: null,
};

@Component({
  selector: 'my-app',
  standalone: true,
  imports: [CommonModule],
  template: ``,
})
export class App {
  name = 'Angular';

  protected _state$: BehaviorSubject<sampleState>;
  state$: Observable<sampleState>;

  constructor() {
    this._state$ = new BehaviorSubject<sampleState>(initialState);
    this.state$ = this._state$.asObservable();

    this.select((state) => state.a).subscribe((value) => {
      console.log('a ' + value);
      if (value == 1) this.setState({ b: 3 });
    });

    this.select((state) => state.b).subscribe((value) => {
      console.log('b ' + value);
    });

    this._state$.subscribe((value) => console.log(value));

    this.setState({ a: 1, b: 1 });
  }

  public get state() {
    return this._state$.getValue();
  }

  public setState(newState: any) {
    this._state$.next({
      ...this.state,
      ...newState,
    });
  }

  public select<K>(mapFn: (state: sampleState) => K): Observable<K> {
    return this._state$.asObservable().pipe(
        map((state: sampleState) => mapFn(state)),
       distinctUntilChanged()
    );
  }
}

I get the following logs

{a: null, b: null}
{a: 1, b: 3}
{a: 1, b: 1}

while I would expect

{a: null, b: null} // initialisation
{a: 1, b: 1} // setState
{a: 1, b: 3} // if a == 1 => set b = 3

It appears the first setState is not completed before calling the next one. Is there a way to change this behavior so the states change sequentially?


Solution

  • You're seeing this behavior because all of the operations are synchronous and you are emitting a new value from within your "select a" subscription, which causes the subscribe logic to be executed recursively.

    To prevent the recursive behavior, you can utilize the asapScheduler like this:

    this.state$ = this._state$.pipe(observeOn(asapScheduler));
    

    Now, update your select method to use this.state$ instead of this._state$ and you should be good to go.

    Here's a StackBlitz you can check out.