Search code examples
typescriptrxjsobservable

RxJS observable that fires if it's the first of a kind OR a certain time went by


Hey RxJS pros out there,

I have a stream that procs multiple times a second. Now I want an observer that procs if it's the first of a kind OR a certain time went by after the last proc. I want to achieve this with pure RxJS without additional "helper variables".

Scenario:

const list = ["foo", "foo", "bar", "foo",
              "foo", "foo", "foo", "foo",
              "foo", "foo", "bar", "foo"];

// in real world obs$ drops "foo" and "bar" randomly infinite times
const obs$ = timer(0, 100).pipe(take(12)); 

$obs
  .pipe(map((v, i)=>list[i]+"#"+i))
  .subscribe(console.log); 

Below are all the values fired by the observable. I want to catch the green ones (✅), and ignore the red ones (❌).

// ⬇
"foo#1"  // ✅ first of a kind
"foo#2"  // ❌
"bar#3"  // ✅ first of a kind
"foo#4"  // ✅ first of a kind
"foo#5"  // ❌
"foo#6"  // ❌
"foo#7"  // ❌
"foo#8"  // ❌
"foo#9"  // ✅ <-- I want this one too, because a certain time (0.5 seconds) went by
"foo#10" // ❌
"bar#11" // ✅ first of a kind
"foo#12" // ✅ first of a kind

So I want this output:

1#foo
3#bar
4#foo
9#foo
11#bar
12#foo

How?


Solution

  • You can use the scan operator to compute a "state" based on a prior state and the incoming emission.

    In this case, our "state" will be an object with all the info needed to determine whether a given emission should be emitted or not. We can provide an initial state that the scan function will use when handling the first emission. After that, it will use the value we return as the state.

    Here's what we can use as the initial state:

    {
      startTime: Date.now(),
      previousKind: '',
      shouldEmit: true,
      item: '',
    }
    
    • startTime is used to determine if enough time has passed to force emitting.
    • previousKind is to keep track of the kind of item previously emitted, so we can determine if the current item is different from the previous.
    • shouldEmit is a boolean to indicate if this item should be emitted.
    • item is just the emitted item.

    This info will be used inside our scan operator below to generate the new state and this is the same shape that will be emitted by the scan operator:

    obs$.pipe(
      scan(/* generates state - details revealed later */),
      filter(state => state.shouldEmit),
      map(state => state.item)
    ).subscribe(
      item => console.log(`✅ ${item}`)
    ); 
    

    You can see we apply our scan operation, then simply filter out items that are not marked as shouldEmit, then, we use map to emit the original item.


    Here is the contents of the scan operator.

      scan((state, item) => {
        const kind = item.split('#')[0];
        const isFirstOfKind = kind !== state.previousKind;
        const durationExceeded = Date.now() - state.startTime > DURATION;
        const shouldEmit = isFirstOfKind || durationExceeded;
    
        return {
          startTime: shouldEmit ? Date.now() : state.startTime,
          previousKind: kind,
          shouldEmit,
          item,
        }
      }, {
        startTime: Date.now(),
        previousKind: '',
        shouldEmit: true,
        item: '',
      })
    

    You can see we pass scan a function that receives the prior state and the current emission ('item'). With this info, we return a new state that will be emitted to operators downstream. This state is also available the next time scan receives an emission.


    Here is a StackBlitz demo.