Search code examples
typescriptrxjsreactive-programming

With RxJS 6, how do I re-emit values from observed arrays, with a delay between each/all values?


At regular intervals, I want to poll an API that will return an array of records which will almost certainly vary in size. I want each record to display in a CSS animation which takes t amount of time. Therefore, I must buffer the API responses and release them individually, no more frequently than t, so that the animation can complete smoothly.

After much searching and trial and error, I put together this custom RxJS operator (in framework-less TypeScript). However, the double/nested concatMap has a bit of code smell. Is there a more elegant or reactive solution? Are all the inner observables managed properly (unsubscribed from)?

(This is my first custom operator, so any other feedback is welcome.)

export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> {
    // inner function automatically receives source observable
    return (source: Observable<IRecord[]>) => {
        return source.pipe(
            // First, explode the array to show one at a time.
            concatMap((records: IRecord[]) => from(records).pipe(
                // Now, for each array value, add a delay
                concatMap((record: IRecord) => of(record).pipe(
                    delay(t),
                ))
                tap((record: IRecord) => {
                    // executes once per record, no faster than every t
                }),
            )),
            tap((record: IRecord) => {
                // alternative also executes once per record, no faster than every t
            }),
        );
    };
}
MyApi.doPolling().pipe(
    recordPace(),
    recordAnimate(),
).subscribe(
    () => {},
    () => {},
    () => { console.log('done'); }
);

Solution

  • If you want to explode the array, you can use mergeAll().

    export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> {
      return (source: Observable<IRecord[]>) => {
        return source.pipe(
          // concatMap((records: IRecord[]) => from(records).pipe(
          mergeAll(),
          concatMap((record: IRecord) => of(record).pipe(
            delay(t),
          ))
          tap((record: IRecord) => {
            // executes once per record, no faster than every t
          }),
          // )),
          tap((record: IRecord) => {}),
        );
      };
    }
    

    You can use any of the mergeAll, switchAll, concatAll operators. This is because if you only want to explode the array, there is no async operation involved. Each operator mentioned above takes in a function that returns an ObservableInput

    export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
    

    As you can see, an array could be considered an ObservableInput and the behaviour is that if your callback function returns an array, it will send each of its items individually.

    Also, congrats on your first custom operator!