Search code examples
rxjspolling

How to make polling observable detect unfullfilled promise


This observable polls getPromise() function every second. After getPromise() function returns 3 promises it stops resolving them. How do I detect that getPromise() function hasn't resolve/rejected any promise for the past, let's say, 2 seconds, and call onError handler. I've tried making it work with timeout operator to no avail. Any ideas?

Rx.Observable.interval(1000)
  .switchMap(() => Rx.Observable.fromPromise(getPromise()))
  .subscribe(onValue, onError);

function onValue(value){
  console.log('value: ', value);
}
function onError(error){
  console.log('error: ', error);
}
var getPromise = (function(){
  var counter = 3;
  return function(){
    return new Promise(function(resolve, reject){
      if(counter > 0) resolve(1);
      counter--;
    })
  }
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>


Solution

  • You can use the race operator that subscribes only to the first Observable that emits.

    You said you want to call onError handler after 2 of inactivity. This contradicts with using switchMap which automatically unsubscribes when a new Observable is returned from its callback. So you might want to use exhaustMap instead. Also when you emit an error notification the chain unsubscribes and you'll never receive any other value. This means that you shouldn't emit the timeout as an error or use also the retry operator to automatically resubscribe (but this really depends on what you're trying to achieve).

    This is you updated example that is just using the race() operator.

    Rx.Observable.interval(1000)
      .switchMap(() => 
        Rx.Observable.race(
          Rx.Observable.fromPromise(getPromise()),
          Rx.Observable.timer(0, 1000).mapTo(42)
        )
      )
      .subscribe(onValue, onError);
    
    function onValue(value){
      console.log('value: ', value);
    }
    function onError(error){
      console.log('error: ', error);
    }
    var getPromise = (function(){
      var counter = 3;
      return function(){
        return new Promise(function(resolve, reject){
          if(counter > 0) resolve(1);
          counter--;
        })
      }
    })();
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

    Edit: To send a single error notification after 2 seconds of inactivity.

    Rx.Observable.interval(1000)
      .switchMap(() => Rx.Observable.fromPromise(getPromise()))
      .timeout(2000)
      .subscribe(onValue, onError);
    
    function onValue(value){
      console.log('value: ', value);
    }
    function onError(error){
      console.log('error: ', error);
    }
    var getPromise = (function(){
      var counter = 3;
      return function(){
        return new Promise(function(resolve, reject){
          if(counter > 0) resolve(1);
          counter--;
        })
      }
    })();
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>

    There's really a bug in 5.3.0 not directly in the timeout() operator but in scheduling async actions. https://github.com/ReactiveX/rxjs/pull/2580

    Without the timeout() operator:

    Rx.Observable.interval(1000)
      .switchMap(() =>
        Rx.Observable.race(
          Rx.Observable.fromPromise(getPromise()),
          Rx.Observable.timer(0, 2000).map(function(_) {
            throw new Error('timeout');
          })
        )
      )
      .subscribe(onValue, onError);