Search code examples
rxjsrxjs5

Pause, upon resume give last paused value


I have a hot Observable fed by a socket. I can use the pausable to pause the socket feed. But once I 'unpause' the observable, I need to display the last values that the socket could have sent while the subscription was paused. I don't want to keep track of the last values the socket sends manually. How could this be pausible?

From the example in the documentation, see comments below:

var pauser = new Rx.Subject();
var source = Rx.Observable.fromEvent(document, 'mousemove').pausable(pauser);

var subscription = source.subscribe(
    function (x) {
        //somehow after pauser.onNext(true)...push the last socket value sent while this was paused...
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// To begin the flow
pauser.onNext(true); 

// To pause the flow at any point
pauser.onNext(false);  

Solution

  • You don't even need pausable to do this. (Note as well that you tagged RxJS5 but pausable only exists in RxJS 4). You simply need to convert your pauser into a higher order Observable:

    var source = Rx.Observable.fromEvent(document, 'mousemove')
      // Always preserves the last value sent from the source so that
      // new subscribers can receive it.
      .publishReplay(1);
    
    pauser
      // Close old streams (also called flatMapLatest)
      .switchMap(active => 
        // If the stream is active return the source
        // Otherwise return an empty Observable.
        Rx.Observable.if(() => active, source, Rx.Observable.empty())
      )
      .subscribe(/**/)
    
    //Make the stream go live
    source.connect();