This observable polls getPromise()
function every second. After getPromise()
function returns 3 promises it stops resolving them. How do I detect that getPromise()
function hasn't resolve/rejected any promise for the past, let's say, 2 seconds, and call onError
handler. I've tried making it work with timeout
operator to no avail. Any ideas?
Rx.Observable.interval(1000)
.switchMap(() => Rx.Observable.fromPromise(getPromise()))
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
You can use the race
operator that subscribes only to the first Observable that emits.
You said you want to call onError
handler after 2 of inactivity. This contradicts with using switchMap
which automatically unsubscribes when a new Observable is returned from its callback. So you might want to use exhaustMap
instead. Also when you emit an error notification the chain unsubscribes and you'll never receive any other value. This means that you shouldn't emit the timeout as an error
or use also the retry
operator to automatically resubscribe (but this really depends on what you're trying to achieve).
This is you updated example that is just using the race()
operator.
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 1000).mapTo(42)
)
)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
Edit: To send a single error notification after 2 seconds of inactivity.
Rx.Observable.interval(1000)
.switchMap(() => Rx.Observable.fromPromise(getPromise()))
.timeout(2000)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>
There's really a bug in 5.3.0 not directly in the timeout()
operator but in scheduling async actions. https://github.com/ReactiveX/rxjs/pull/2580
Without the timeout()
operator:
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 2000).map(function(_) {
throw new Error('timeout');
})
)
)
.subscribe(onValue, onError);