Search code examples
javascriptrxjs5

Process observable subscribe events synchronously


I'm looking for a way to process events from ReplaySubject.subscribe() in a synchronous fashion.

let onSomeEvent = new ReplaySubject();

onSomeEvent.subscribe(async (event) => {      
  return await this.saveEventToDb(event);
});

In this example, saveEventToDb() first checks the database whether an event with the same ID was already stored. If not, it stores it.

The problem is I need to account for duplicate events firing from the subject.

In this example, when 2 duplicate event fire back-to-back, both get added to the database because saveEventToDb() gets called twice immediately without waiting for the previous call to finish.

How can I queue these up using Rxjs?


Solution

  • The following worked to process the events synchronously:

    onSomeEvent
        .map(event => {
           return Observable.defer(() => {
              return this.saveEventToDb(event);
           });
        })
        .concatAll()
        .subscribe();
    

    ConcatAll(): Collect observables and subscribe to next when previous completes.