I'm trying to implement a backpressure strategy on HTTP requests, to temporarily hold back pending requests for a number of seconds under certain conditions. The logic to pause will be based on another Observable.
My research and understanding leads me to believe the pausableBuffered
operator does exactly what I need. Documented here http://reactivex.io/documentation/operators/backpressure.html.
However I can not find this operator in ReactiveX v5 (5.0.0-beta.0) and the migration guide (v4 - v5) seems to indicate they have been dropped. If this is the case, how can I achieve the desired result with the v5 available operators?
The backpressure story has been dropped entirely for now.
Here's one way to get the same result:
const pausableBuffered = (observable, pauser) => {
const subj = new rx.Subject();
let buffer = [];
const nextEmitter = x => subj.next(x);
const nextBuffer = x => buffer.push(x);
let subscriber = nextEmitter;
observable.subscribe(x => subscriber(x));
pauser.subscribe(value => {
if (value) {
subscriber = nextBuffer;
} else {
buffer.forEach(nextEmitter);
buffer = [];
subscriber = nextEmitter;
}
})
return subj;
};