Hey RxJS pros out there,
I have a stream that procs multiple times a second. Now I want an observer that procs if it's the first of a kind OR a certain time went by after the last proc. I want to achieve this with pure RxJS without additional "helper variables".
Scenario:
const list = ["foo", "foo", "bar", "foo",
"foo", "foo", "foo", "foo",
"foo", "foo", "bar", "foo"];
// in real world obs$ drops "foo" and "bar" randomly infinite times
const obs$ = timer(0, 100).pipe(take(12));
$obs
.pipe(map((v, i)=>list[i]+"#"+i))
.subscribe(console.log);
Below are all the values fired by the observable. I want to catch the green ones (✅), and ignore the red ones (❌).
// ⬇
"foo#1" // ✅ first of a kind
"foo#2" // ❌
"bar#3" // ✅ first of a kind
"foo#4" // ✅ first of a kind
"foo#5" // ❌
"foo#6" // ❌
"foo#7" // ❌
"foo#8" // ❌
"foo#9" // ✅ <-- I want this one too, because a certain time (0.5 seconds) went by
"foo#10" // ❌
"bar#11" // ✅ first of a kind
"foo#12" // ✅ first of a kind
So I want this output:
1#foo
3#bar
4#foo
9#foo
11#bar
12#foo
How?
You can use the scan
operator to compute a "state" based on a prior state and the incoming emission.
In this case, our "state" will be an object with all the info needed to determine whether a given emission should be emitted or not. We can provide an initial state that the scan function will use when handling the first emission. After that, it will use the value we return as the state.
Here's what we can use as the initial state:
{
startTime: Date.now(),
previousKind: '',
shouldEmit: true,
item: '',
}
startTime
is used to determine if enough time has passed to force emitting.previousKind
is to keep track of the kind of item previously emitted, so we can determine if the current item is different from the previous.shouldEmit
is a boolean to indicate if this item should be emitted.item
is just the emitted item.This info will be used inside our scan operator below to generate the new state and this is the same shape that will be emitted by the scan operator:
obs$.pipe(
scan(/* generates state - details revealed later */),
filter(state => state.shouldEmit),
map(state => state.item)
).subscribe(
item => console.log(`✅ ${item}`)
);
You can see we apply our scan operation, then simply filter
out items that are not marked as shouldEmit
, then, we use map
to emit the original item.
Here is the contents of the scan
operator.
scan((state, item) => {
const kind = item.split('#')[0];
const isFirstOfKind = kind !== state.previousKind;
const durationExceeded = Date.now() - state.startTime > DURATION;
const shouldEmit = isFirstOfKind || durationExceeded;
return {
startTime: shouldEmit ? Date.now() : state.startTime,
previousKind: kind,
shouldEmit,
item,
}
}, {
startTime: Date.now(),
previousKind: '',
shouldEmit: true,
item: '',
})
You can see we pass scan
a function that receives the prior state and the current emission ('item'). With this info, we return a new state that will be emitted to operators downstream. This state is also available the next time scan
receives an emission.