Search code examples
rxjsreactivex

Get one event per 1000 milliseconds interval


I have infinite stream of events that can emit some consecutive event portions and I want to take one event per 1000 each milliseconds.

I tried debounceTime / auditTime / throttleTime but they doesn't include all events I want - to demonstrate the behavior I created playground on stackblitz which fires events once per 300ms within portion of 10 events:

  • debounceTime(1000) will give only event #10

  • throttleTime(1000) will give events 1,5,9 but it will omit #10 which is essential

  • auditTime(1000) will give events 4,8

What I want here is to get events 1,5,9,10 (one event per 1000ms interval). How do I achieve this?

const events$ = interval(300).pipe(
  map(num => `Firing event ${num + 1}`)
);

const source = events$.pipe(
  tap(console.log),
  take(10),
  // make some debouncing??
  map(x => `Received ${x}!`)
);

source.subscribe(x =>
  console.log(
    "%c" + x,
    "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
  )
);

I also tried to play with zip / combineLatest and emitting values via interval but no luck with that


Solution

  • UPDATED

    based on the discussion in comments

    const events$ = timer(0, 6000).pipe(
      take(3),
      switchMap(x =>
        timer(0, 300).pipe(
          map(num => `event #${num + 1}`),
          take(x > 1 ? 9 : 10)
        )
      )
    );
    
    const source = merge(
      events$.pipe(
        tap(e => console.log(`%cStream: ${e}`, "color: blue;")),
        debounceTime(1000),
        tap(x => console.log(`%cdebounceTime captured: ${x}`, "color: red;"))
      ),
      events$.pipe(
        throttleTime(1000),
        tap(x => console.log(`%cthrottleTime captured: ${x}`, "color: green;"))
      ),
    ).pipe(
      // we need to avoid duplicates (like case with 9).
      // if all events aren't unique you need to use the original solution below.
      distinctUntilChanged(), // <-- if all events are unique.
      map(x => `Received ${x}!`)
    );
    
    source.subscribe(x =>
      console.log(
        "%c" + x,
        "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
      )
    );
    

    ORIGINAL

    I hope that's what you wanted: https://take.ms/VP7tA

    const events$ = interval(300).pipe(
        map(num => `Firing event ${num + 1}`)
    );
    
    const source = concat(events$.pipe(
        tap(console.log),
        take(10),
    ), timer(1000).pipe(switchMapTo(EMPTY)), events$.pipe(
        tap(console.log),
        take(10),
    ));
    
    let lastTimer = 0;
    const last$ = new Subject<number>();
    merge(
        source.pipe(
          scan((state, event) => {
            state[1] = null;
            const stamp = new Date().getTime();
            clearTimeout(lastTimer);
            if (stamp - state[0] < 1000) {
              lastTimer = setTimeout(() => last$.next(event), (stamp - state[0]) + 50);
              return state;
            }
            state[0] = stamp;
            state[1] = event;
            return state;
          }, [0, null]),
          filter(([, event]) => event !== null),
          map(([, event]) => event || 0),
        ),
        last$,
    ).pipe(
        map(x => `Received ${JSON.stringify(x)}!`)
    ).subscribe(x =>
        console.log(
            "%c" + x,
            "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
        )
    );