I'm looking for a way to process events from ReplaySubject.subscribe()
in a synchronous fashion.
let onSomeEvent = new ReplaySubject();
onSomeEvent.subscribe(async (event) => {
return await this.saveEventToDb(event);
});
In this example, saveEventToDb()
first checks the database whether an event with the same ID was already stored. If not, it stores it.
The problem is I need to account for duplicate events firing from the subject.
In this example, when 2 duplicate event fire back-to-back, both get added to the database because saveEventToDb()
gets called twice immediately without waiting for the previous call to finish.
How can I queue these up using Rxjs?
The following worked to process the events synchronously:
onSomeEvent
.map(event => {
return Observable.defer(() => {
return this.saveEventToDb(event);
});
})
.concatAll()
.subscribe();
ConcatAll(): Collect observables and subscribe to next when previous completes.