Search code examples
javascriptrxjsreactive-programming

In RxJs how can you make a throttled buffer operator?


I have the following need:

I want to throttle a function call using rxjs with both leading and trailing set to true. However, I don't want to ignore the fn calls that were skipped; I want to include the output of all function calls that as an array, both at the beginning and ending of the throttle. Kind of like combining throttle and buffer

To give a better example:

I only want to emit the number of clicks every 1 second. If there is a long pause, when the next click happens, emit that click's x and y right away (in an array as if it was buffered). All clicks since that first click up until 1 second, will be observed as an array one that second arrives.

Here is some code I tried but doesnt seem to work, tryable on this codesandbox:

import { pipe, asyncScheduler, interval } from "rxjs";
import { publish, throttleTime, buffer, take } from "rxjs/operators";

const throttledBuffer = (throttleTimeMs) => {
  return pipe(
    // publish this observable so it can be shared
    publish((observable$) => {
      // get invokations, and throttle them
      const throttleCalls$ = observable$.pipe(
        throttleTime(throttleTimeMs, asyncScheduler, {
          leading: true,
          trailing: true
        }),
        
      );

      // buffer observable, emitting an array of accumulated values for each fn call at each observed throttle
      return observable$.pipe(
        buffer(throttleCalls$)
      );
    })
  );
};


const clicks$ = interval(50);


clicks$.pipe(throttledBuffer(100),take(4)).subscribe({
  next: (value) => {
    console.log(value);
  }
});

In the above code snippet, I would expect to have the result:

[0]
[1, 2]
[3]
[4, 5]

However, this is what happens:

[]
[0,1,2]
[]
[3,4,5]

It seems as if the buffer is occuring before the throttle can emit something; or after it and skips it. Has anyone succeeded in making a custom operator like this?


Solution

  • TLDR;

    Here would be an approach:

    const throttledBuffer = (throttleTimeMs) => {
      return pipe(
        publish((observable$) => {
          const throttleCalls$ = observable$.pipe(
            // tap((v) => console.log("in throttle", v)),
            throttleTime(throttleTimeMs, asyncScheduler, {
              leading: true,
              trailing: true
            }),
            // tap(() => console.log("after throttle"))
          );
    
          const src$ = observable$.pipe(share());
          return merge(
            src$.pipe(ignoreElements()),
            src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
          );
        })
      );
    };
    
    const clicks$ = interval(50);
    clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
      next: (value) => {
        console.log("p", value);
      }
    });
    
    /* 
    console output:
    
    p 
    (1) [0]
    p 
    (2) [1, 2]
    p 
    (1) [3]
    p 
    (2) [4, 5]
    */
    

    You can uncomment the taps in order to examine what's going on.

    A working demo can be found here.


    Why the above solution works and what went wrong with the previous one

    Understanding the publish operator

    First, it's important to highlight how the publish operator works:

    export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
      return selector ? connect(selector) : multicast(new Subject<T>());
    }
    

    In this case, we're interesting in the selector approach. But, as you can see, if you were to use simply publish(), it would use multicast with Subject. As a side note, publishReplay uses multicast too, but with a ReplaySubject instead of Subject.

    The connect function is defined as follows:

    // `source` - the further-up observable in the chain
    // `subscriber` - the newly created subscriber
    /* 
    To get a better understanding of what `subscriber` and `source` really are, let's consider this quick example
    
    const o$ = new Observable(subscriber => {});
    o$.pipe(
      map(...),
    ).subscribe({ next: v => console.log('hello', v) })
    
    In the `map`'s implementation, when the `map` observable is subscribed, it will call again `source.subscribe(someSubscriber)`.
    In that case, `someSubscriber` is a `{ next: v => console.log('hello', v) }`(roughly) and `source` is `o$`.
    So, in `subscriber => {}` from `Observable`'s callback, the `subscriber` will actually be `someSubscriber`.
    */
    
    // It is a `Subject` instance
    const subject = connector();
    
    from(selector(fromSubscribable(subject))).subscribe(subscriber);
    subscriber.add(source.subscribe(subject));
    

    fromSubscribable(subject) will return an Observable instance that, when subscribed to, the subscriber will be added to the subject's subscribers list. So, the argument to the callback function your provided to publish will be that Observable instance I mentioned before. The point of all of this is to know that every subscriber created inside that callback will be part of the subject's subscribers list. So, with subscriber.add(source.subscribe(subject)), the subject will subscribe to the source, meaning that all the values which come from the source will be received by the subject in use, which also means that those values will be sent to all of the subscribers in that list.

    So, the gist is that the order in which you create your subscribers in that callback function is important.

    Why the problem occurs

    Let's now see what the problem was in the first place:

    const throttledBuffer = (throttleTimeMs) => {
      return pipe(
        publish((observable$) => {
          const throttleCalls$ = observable$.pipe(
            throttleTime(throttleTimeMs, asyncScheduler, {
              leading: true,
              trailing: true
            })
          );
    
          return observable$.pipe(buffer(throttleCalls$));
        })
      );
    };
    
    const clicks$ = interval(50);
    
    clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
      next: (value) => {
        console.log("p", value);
      }
    });
    
    /* 
    console output:
    
    p
    []
    p 
    (3) [0, 1, 2]
    p 
    []
    p 
    (3) [3, 4, 5]
    */
    

    By returning observable$.pipe(buffer(throttleCalls$)), is essentially the same as doing

    observable$.pipe(buffer(throttleCalls$)).subscribe(subscriber /* #1 */);
    

    And here is, finally :), the problem. The problem is that throttleCalls$ will be subscribed first, because the concept behind this can be boiled down to f(g(x)), where g(x) will be called first. Because throttleCalls$is subscribed first, it means that that subject instance will register this subscriber as the first one in the list. As a result, the subscriber#1 will be the second one. This means that when the source emits something, the subscriber created from throttleCalls$' subscription will be the first one to receive the value, and that's why you'll get an empty array first.
    Remember that a Subject sends the values to its subscribers synchronously. This means that immediately after the buffer emitted [], the second subscriber will receive the value and the buffer's array will be [0]. Since a second has not yet elapsed, 1 and 2 will make it, and when the second eventually elapses, the buffer's array will be [0, 1, 2]. Then, again, 3 is emitted, but the first subscriber to receive it will be the one which belongs to throttleCalls$, which is the buffer's notifier, so, again, we'd get an empty array.

    So, the problem is that the order in which the subscribers are added into the Subject's subscribers list is not the one we want.

    Why the solution works

    Let's see once again the solution from the TLDR section:

    const throttledBuffer = (throttleTimeMs) => {
      return pipe(
        publish((observable$) => {
          const throttleCalls$ = observable$.pipe(
            // tap((v) => console.log("in throttle", v)),
            throttleTime(throttleTimeMs, asyncScheduler, {
              leading: true,
              trailing: true
            }),
            // tap(() => console.log("after throttle"))
          );
    
          const src$ = observable$.pipe(share());
          return merge(
            src$.pipe(ignoreElements()),
            src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
          );
        })
      );
    };
    
    const clicks$ = interval(50);
    clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
      next: (value) => {
        console.log("p", value);
      }
    });
    
    /* 
    console output:
    
    p 
    (1) [0]
    p 
    (2) [1, 2]
    p 
    (1) [3]
    p 
    (2) [4, 5]
    */
    

    With the bit of knowledge acquired from the previous section, let's go step by step to understand what's going on. We know the problem was the order, so we could work on that. We can get the order we want by using merge like this:

    merge(
      // `ignoreElements()` will ignore `next`, but will let `error`/`complete` notification in
      // By providing this as the first argument, the we can be sure that the first emitted value 
      // will be **first** part of the buffer, and then the `buffer`'s notifier will emit
      src$.pipe(ignoreElements()),
    
      // Ensuring the `buffer`'s notifier is the **second** in the list
      src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
    );
    

    However, there is one more thing to do in order to make sure everything works as expected, which is

    const src$ = observable$.pipe(share());
    

    what share() does is to add a Subject instance between the data consumers(the 2 merges arguments) and the data producer(observable$).

    The reason we needed this is best explained with the help of this little diagram:

                        s1 (the `observable$` argument)
                      /  \
                    s2    throttleCalls$
                    /  \
    (1)ignoreElements    (2)buffer(throttleCalls$)
    
    s2 - the Subject we got from `share()`
    

    so, when s1 receives a value, s2 will receive it first, then (1), then (2), and then throttleCalls$. And this is what we want, because we first add to the buffer, and after that the buffer's notifier(which is throttleCalls$) emits.

    Now you might wonder, why wouldn't it work like this?

    // const src$ = observable$.pipe(share());
    return merge(
      observable$.pipe(ignoreElements()),
      observable$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
    );
    

    The diagram for the above snippet would look like this:

                   s1 (the `observable$` argument)
                  /  \
    (1)ignoreElements  (2)buffer(throttleCalls$)
    

    Well, we came back to the same problem we had initially, throttleCalls$ is subscribed first, because (1) and (2) are completely independent. The subscribers list would (roughly) look like this:

    [
      ignoreElements,
      throttleCalls$,
      (2) - the `buffer`'s source
    ]