I am new to ReactiveExtensions and I do not get to work s.th. that I think should be a very common usecase. I want to receive a new value only after a specific period of time has passed without a new next value. In the bottom example this period of time is 1 second. The debounce-operator seems to do exactly what I want. I do not get it to work though.
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 100);
setTimeout(() => {
observer.next(2);
}, 200);
setTimeout(() => {
observer.next(3);
}, 300);
setTimeout(() => {
observer.next(4);
}, 400);
setTimeout(() => {
observer.next('end');
}, 1500);
});
let sub = observable$
.debounce(1000) //debounce(1000, null) does not work either
.take(100)
.subscribe(data => {
console.log(data);
},
err => console.log(err.message),
complete => console.log('Observable completed')
)
What I want to get is a console-output of only:
"start"
"end"
In my IDE (Webstorm) the above code does not even compile although the documentation states that the second argument is optional. On jsbin.com i get the following error: "this.durationSelector.call is not a function" (I admit, I dont know yet how to apply schedulers in rxjs). In the documentation they use only a number, too. Most examples of debounce i found on google use only a number, i.e. this example on Stackoverflow. Why doesn't this work in my case?
Thanks for your help!
PS: I use rxjs 5.0.0-beta.6.
EDIT: With the help of the answers here I did find the actual soltion I wanted:
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 1100); //<-- If you change 1100 to i.e. 900 you just get "end" in the output, because there is no 1s periode during which no new value arrives.
setTimeout(() => {
observer.next(2);
}, 1200);
setTimeout(() => {
observer.next(3);
}, 1300);
setTimeout(() => {
observer.next(4);
}, 1400);
setTimeout(() => {
observer.next(5);
}, 1500);
setTimeout(() => {
observer.next('end');
}, 1501);
});
let sub = observable$
.debounceTime(1000)
.take(10)
.subscribe(data => {
console.log(data);
},
err => console.log(err.message),
complete => console.log('Observable completed')
);
You can do something like this
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 100);
setTimeout(() => {
observer.next(2);
}, 200);
setTimeout(() => {
observer.next(3);
}, 300);
setTimeout(() => {
observer.next(4);
}, 400);
setTimeout(() => {
observer.next('end');
}, 1500);
});
let sub = observable$
.map(function(x, i) {
return {
val: x,
index: i
};
})
.debounce(function(obj) {
let interval = obj.index === 0 ? 0 : 1500;
return Rx.Observable.timer(interval);
})
.take(100)
.subscribe(data => {
console.log(data.val);
},
err => console.log(err.message),
complete => console.log('Observable completed')
)
The key here is to use map
function to get the index of the element and then decide the waiting interval.