Search code examples
reactive-programmingrxjsreactive-streams

How to pause and buffer Observables in RxJS v5


I'm trying to implement a backpressure strategy on HTTP requests, to temporarily hold back pending requests for a number of seconds under certain conditions. The logic to pause will be based on another Observable.

My research and understanding leads me to believe the pausableBuffered operator does exactly what I need. Documented here http://reactivex.io/documentation/operators/backpressure.html.

However I can not find this operator in ReactiveX v5 (5.0.0-beta.0) and the migration guide (v4 - v5) seems to indicate they have been dropped. If this is the case, how can I achieve the desired result with the v5 available operators?


Solution

  • The backpressure story has been dropped entirely for now.

    Here's one way to get the same result:

    const pausableBuffered = (observable, pauser) => {
        const subj = new rx.Subject();
    
        let buffer = [];
        const nextEmitter = x => subj.next(x);
        const nextBuffer = x => buffer.push(x);
    
        let subscriber = nextEmitter;
        observable.subscribe(x => subscriber(x));
    
        pauser.subscribe(value => {
            if (value) {
                subscriber = nextBuffer;
            } else {
                buffer.forEach(nextEmitter);
                buffer = [];
                subscriber = nextEmitter;
            }
        })
    
        return subj;
    };