I have the following need:
I want to throttle a function call using rxjs with both leading
and trailing
set to true. However, I don't want to ignore the fn calls that were skipped; I want to include the output of all function calls that as an array, both at the beginning and ending of the throttle. Kind of like combining throttle
and buffer
To give a better example:
I only want to emit the number of clicks every 1 second. If there is a long pause, when the next click happens, emit that click's x and y right away (in an array as if it was buffered). All clicks since that first click up until 1 second, will be observed as an array one that second arrives.
Here is some code I tried but doesnt seem to work, tryable on this codesandbox:
import { pipe, asyncScheduler, interval } from "rxjs";
import { publish, throttleTime, buffer, take } from "rxjs/operators";
const throttledBuffer = (throttleTimeMs) => {
return pipe(
// publish this observable so it can be shared
publish((observable$) => {
// get invokations, and throttle them
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
);
// buffer observable, emitting an array of accumulated values for each fn call at each observed throttle
return observable$.pipe(
buffer(throttleCalls$)
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100),take(4)).subscribe({
next: (value) => {
console.log(value);
}
});
In the above code snippet, I would expect to have the result:
[0]
[1, 2]
[3]
[4, 5]
However, this is what happens:
[]
[0,1,2]
[]
[3,4,5]
It seems as if the buffer is occuring before the throttle can emit something; or after it and skips it. Has anyone succeeded in making a custom operator like this?
Here would be an approach:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
You can uncomment the tap
s in order to examine what's going on.
A working demo can be found here.
publish
operatorFirst, it's important to highlight how the publish
operator works:
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
}
In this case, we're interesting in the selector
approach. But, as you can see, if you were to use simply publish()
, it would use multicast
with Subject
. As a side note, publishReplay
uses multicast
too, but with a ReplaySubject
instead of Subject
.
The connect
function is defined as follows:
// `source` - the further-up observable in the chain
// `subscriber` - the newly created subscriber
/*
To get a better understanding of what `subscriber` and `source` really are, let's consider this quick example
const o$ = new Observable(subscriber => {});
o$.pipe(
map(...),
).subscribe({ next: v => console.log('hello', v) })
In the `map`'s implementation, when the `map` observable is subscribed, it will call again `source.subscribe(someSubscriber)`.
In that case, `someSubscriber` is a `{ next: v => console.log('hello', v) }`(roughly) and `source` is `o$`.
So, in `subscriber => {}` from `Observable`'s callback, the `subscriber` will actually be `someSubscriber`.
*/
// It is a `Subject` instance
const subject = connector();
from(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
fromSubscribable(subject)
will return an Observable
instance that, when subscribed to, the subscriber will be added to the subject
's subscribers list. So, the argument to the callback function your provided to publish
will be that Observable
instance I mentioned before. The point of all of this is to know that every subscriber created inside that callback will be part of the subject
's subscribers list. So, with subscriber.add(source.subscribe(subject))
, the subject
will subscribe to the source
, meaning that all the values which come from the source
will be received by the subject
in use, which also means that those values will be sent to all of the subscribers in that list.
So, the gist is that the order in which you create your subscribers in that callback function is important.
Let's now see what the problem was in the first place:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
})
);
return observable$.pipe(buffer(throttleCalls$));
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
[]
p
(3) [0, 1, 2]
p
[]
p
(3) [3, 4, 5]
*/
By returning observable$.pipe(buffer(throttleCalls$))
, is essentially the same as doing
observable$.pipe(buffer(throttleCalls$)).subscribe(subscriber /* #1 */);
And here is, finally :), the problem. The problem is that throttleCalls$
will be subscribed first, because the concept behind this can be boiled down to f(g(x))
, where g(x)
will be called first. Because throttleCalls$
is subscribed first, it means that that subject
instance will register this subscriber as the first one in the list. As a result, the subscriber#1
will be the second one. This means that when the source emits something, the subscriber created from throttleCalls$
' subscription will be the first one to receive the value, and that's why you'll get an empty array first.
Remember that a Subject
sends the values to its subscribers synchronously. This means that immediately after the buffer emitted []
, the second subscriber will receive the value and the buffer
's array will be [0]
. Since a second has not yet elapsed, 1
and 2
will make it, and when the second eventually elapses, the buffer
's array will be [0, 1, 2]
. Then, again, 3
is emitted, but the first subscriber to receive it will be the one which belongs to throttleCalls$
, which is the buffer
's notifier, so, again, we'd get an empty array.
So, the problem is that the order in which the subscribers are added into the Subject
's subscribers list is not the one we want.
Let's see once again the solution from the TLDR section:
const throttledBuffer = (throttleTimeMs) => {
return pipe(
publish((observable$) => {
const throttleCalls$ = observable$.pipe(
// tap((v) => console.log("in throttle", v)),
throttleTime(throttleTimeMs, asyncScheduler, {
leading: true,
trailing: true
}),
// tap(() => console.log("after throttle"))
);
const src$ = observable$.pipe(share());
return merge(
src$.pipe(ignoreElements()),
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
})
);
};
const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
next: (value) => {
console.log("p", value);
}
});
/*
console output:
p
(1) [0]
p
(2) [1, 2]
p
(1) [3]
p
(2) [4, 5]
*/
With the bit of knowledge acquired from the previous section, let's go step by step to understand what's going on. We know the problem was the order, so we could work on that. We can get the order we want by using merge
like this:
merge(
// `ignoreElements()` will ignore `next`, but will let `error`/`complete` notification in
// By providing this as the first argument, the we can be sure that the first emitted value
// will be **first** part of the buffer, and then the `buffer`'s notifier will emit
src$.pipe(ignoreElements()),
// Ensuring the `buffer`'s notifier is the **second** in the list
src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
However, there is one more thing to do in order to make sure everything works as expected, which is
const src$ = observable$.pipe(share());
what share()
does is to add a Subject
instance between the data consumers(the 2 merge
s arguments) and the data producer(observable$
).
The reason we needed this is best explained with the help of this little diagram:
s1 (the `observable$` argument)
/ \
s2 throttleCalls$
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
s2 - the Subject we got from `share()`
so, when s1
receives a value, s2
will receive it first, then (1)
, then (2)
, and then throttleCalls$
. And this is what we want, because we first add to the buffer, and after that the buffer
's notifier(which is throttleCalls$
) emits.
Now you might wonder, why wouldn't it work like this?
// const src$ = observable$.pipe(share());
return merge(
observable$.pipe(ignoreElements()),
observable$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);
The diagram for the above snippet would look like this:
s1 (the `observable$` argument)
/ \
(1)ignoreElements (2)buffer(throttleCalls$)
Well, we came back to the same problem we had initially, throttleCalls$
is subscribed first, because (1)
and (2)
are completely independent. The subscribers list would (roughly) look like this:
[
ignoreElements,
throttleCalls$,
(2) - the `buffer`'s source
]