Search code examples
javascriptrxjsrxjs5reactive-extensions-js

RxJs: New value only after a period of time has passed


I am new to ReactiveExtensions and I do not get to work s.th. that I think should be a very common usecase. I want to receive a new value only after a specific period of time has passed without a new next value. In the bottom example this period of time is 1 second. The debounce-operator seems to do exactly what I want. I do not get it to work though.

const observable$ = new Rx.Observable(observer => {

 observer.next('start');

 setTimeout(() => {
  observer.next(1);
 }, 100);
 setTimeout(() => {
  observer.next(2);
 }, 200);
 setTimeout(() => {
  observer.next(3);
 }, 300);
 setTimeout(() => {
  observer.next(4);
 }, 400);

 setTimeout(() => {
  observer.next('end');
 }, 1500);
});

let sub = observable$
      .debounce(1000) //debounce(1000, null) does not work either
      .take(100)
      .subscribe(data => {
          console.log(data);
         }, 
         err => console.log(err.message), 
         complete => console.log('Observable completed')
       )

What I want to get is a console-output of only:

"start"
"end"

In my IDE (Webstorm) the above code does not even compile although the documentation states that the second argument is optional. On jsbin.com i get the following error: "this.durationSelector.call is not a function" (I admit, I dont know yet how to apply schedulers in rxjs). In the documentation they use only a number, too. Most examples of debounce i found on google use only a number, i.e. this example on Stackoverflow. Why doesn't this work in my case?

Thanks for your help!

PS: I use rxjs 5.0.0-beta.6.

EDIT: With the help of the answers here I did find the actual soltion I wanted:

const observable$ = new Rx.Observable(observer => {

observer.next('start');

 setTimeout(() => {
  observer.next(1);
 }, 1100); //<-- If you change 1100 to i.e. 900 you just get "end" in the output, because there is no 1s periode during which no new value arrives. 
 setTimeout(() => {
  observer.next(2);
 }, 1200);
 setTimeout(() => {
  observer.next(3);
 }, 1300);
 setTimeout(() => {
  observer.next(4);
 }, 1400);
 setTimeout(() => {
  observer.next(5);
 }, 1500);

 setTimeout(() => {
  observer.next('end');
 }, 1501);


});

let sub = observable$
      .debounceTime(1000)
      .take(10)
      .subscribe(data => {
         console.log(data);
       }, 
       err => console.log(err.message), 
       complete => console.log('Observable completed')
);

Solution

  • You can do something like this

    const observable$ = new Rx.Observable(observer => {
    
        observer.next('start');
    
        setTimeout(() => {
            observer.next(1);
        }, 100);
        setTimeout(() => {
            observer.next(2);
        }, 200);
        setTimeout(() => {
            observer.next(3);
        }, 300);
        setTimeout(() => {
            observer.next(4);
        }, 400);
    
        setTimeout(() => {
            observer.next('end');
        }, 1500);
    
    
    });
    
    let sub = observable$
        .map(function(x, i) {
            return {
                val: x,
                index: i
            };
        })
        .debounce(function(obj) {
            let interval = obj.index === 0 ? 0 : 1500;
            return Rx.Observable.timer(interval);
        })
        .take(100)
        .subscribe(data => {
                console.log(data.val);
            },
            err => console.log(err.message),
            complete => console.log('Observable completed')
        )
    

    The key here is to use map function to get the index of the element and then decide the waiting interval.