Search code examples
angularrxjsreactive-programmingangular-directive

RxJS: Have Observables only emit values after the previous Observable has completed


I am currently trying to implement an Angular structural directive, that will read & store the text-content of an HTML element (and all of its children), remove its contents on AfterViewInit, and re-add the content with a delay, one character at a time, giving it the appearance of being typed in real-time.

I do this by analyzing the target node's NodeType. If it's text, it will back up the data content into an array and proceed with the next element. If it's a different node, it'll analyze that node recursively as well. This way, I can also modify the text that is contained within another HTMLElement of the target element.

To write the text back to the nodes, I am using Observables. Every node's original values gets emitted one character at a time, with a delay of 60ms between each character, re-adding it to the original node's content to simulate keyboard-input-delay. In my anaylze method, I am also collecting the length of the previous node's content, delaying the Observable by that number * 60ms. My intention was to have the next Observable emit its values only after the previous one has finished.

However (probably due to the asynchronous nature of Observables) often one Observable will start emitting values before the previous one has fully completed. This gives the page sometimes the appearance as if there was more than one line being added at a time, a behaviour I am trying to avoid.

This is what my Observables look like:

from(this.mementos)
  .pipe(
    concatMap(
      memento => of(memento).pipe(delay(memento.previousNodeLength * 60)) // delay Observable by the time it takes to complete the previous node
    )
  )
  .subscribe(memento =>
    from(Array.from(memento.data))
      .pipe(concatMap(text => of(text).pipe(delay(60)))) // delay each character by 60ms
      .subscribe(c => {
        memento.node.data += c;
      })
  );

Complete working Stackblitz that showcases the problem: Stackblitz Example

How can I modify my code so that only one node's content will be added at a time?


Solution

  • Calculating delay with n * 60 gives you exact number but if simulate the same delay with n setTimeout()s it won't match because setTimeout() doesn't guarantee exact timeout.

    So instead you can remove the nested Observable and make one chain that using concatMap will first take each memento and then each character.

    from(this.mementos)
      .pipe(
        concatMap(memento => from(memento.data).pipe( // each memento
          concatMap(char => of({ node: memento.node, char }).pipe(delay(60))), // each char
        ))
      )
      .subscribe(({ node, char }) => {
        node.data += char;
      });
    

    This will start processing the next memento only when the previous one completes. Each character is delayed inside the nested Observable.

    Live demo: https://stackblitz.com/edit/angular-ivy-njfysl?file=src/app/typewriter.directive.ts