Search code examples
javascriptrxjsreactivexreactive

RxJS not all Subscribers receive all events


I'm working on an exercise about RxJS. And there's something very strange happening:

  typoStream.subscribe(x => console.log('wont get executed'));
  wordCompletedStream.subscribe(nextStream);
  typoStream.subscribe(x => console.log('will get executed'));

When the application runs the first console.log won't get printed and the second one will.
Regardless of what the streams are and how they interact - this should never happen, right? Why is it important when I subscribe to an observable - shouldn't it emit the event to every subscriber anyways?

If you want to try it: http://embed.plnkr.co/xb8Yimo5RcYGPtgClYgY/

Type the displayed word correctly and you can see the "error" in action. But it doesn't happen every time - only most of the time.

Here's the stream flow: https://photos.app.goo.gl/Z4cpKzekAIuKzMF93


Solution

  • I had a play with the code you posted, and the key fix is to properly multicast the checkWord observable. You can do this with .publish().refCount() like you did for wordStream, or you can use the shortcut method that does the same thing, .share().

      const checkStream = wordStream.combineLatest(inputStream).share();
    

    The reason this works is that without it, multiple subscriptions to checkStream or any streams derived from it, such as typoStream and wordCompletedStream will each trigger a new subscription to the wordStream observable (which is correctly multicast, so no new request gets made) and the inputStream observable, which will register new event listeners on the input.

    With the .share() operator, it doesn't matter how many subscriptions are made to checkStream or derived observables, only the first one will trigger a subscription to inputStream.

    Note that after this fix, neither of the two subscribers to typoStream will fire for a correctly entered word. Which is what I would expect from an observable called typoStream. Both will fire when an incorrect character is entered.

    Forked Plunkr here

    Or see the snippet below:

    (() => {
    
      // --- UI Stuff, NO NEED TO TOUCH THESE --- //
      const wordField = $('#TotDWord');
      const inputField = $('#TotDInput');
      // ----------------------------------------- //
    
      // A stream of the users string inputs
      const inputFieldStream = Rx.Observable.fromEvent(inputField, 'keyup')
        .map(x => x.target.value).distinctUntilChanged();
    
      // This stream is used to represent the users unput - we don't use the 
      // inputFieldStream directly because we want to manually send values aswell
      const inputStream = new Rx.Subject();
    
    
      // Feed the stream from the field into our inputStream
      inputFieldStream.subscribe(inputStream);
    
      // A stream that allows us to manually trigger that we need a new word
      const nextStream = new Rx.Subject();
    
      // When we want the next word we need to reset the users input
      nextStream.subscribe(() => {
        inputField.val('');
        inputStream.onNext('');
      });
    
      // This stream calls a server for a new random word every time the nextStream emits an event. We startWith a value to trigger the first word
      const wordStream = nextStream.startWith('')
        .flatMapLatest(getRandomWord)
        // publish & refCount cache the result - otherwise every .map on wordStream would cause a new HTTP request
        .publish().refCount();
    
      // When there is a new word, we display it
      wordStream.subscribe(word => {
        wordField.empty();
        wordField.append(word);
      });
    
      // Checkstream combines the latest word with the latest userinput. It emits an array, like this ['the word', 'the user input'];
      const checkStream = wordStream.combineLatest(inputStream).share();
    
      // Emits an event if the user input is not correct
      const typoStream = checkStream.filter(tuple => {
        const word = tuple[0];
        const input = tuple[1];
        return !word.startsWith(input);
      });
    
      // When there is a typo we need a new word
      typoStream.subscribe(nextStream);
    
      // Emits an event when the user has entered the entire word correctly
      const wordCompletedStream = checkStream.filter(tuple => {
        const word = tuple[0];
        const input = tuple[1];
        return word == input;
      });
    
      /**
       * THIS WILL (MOST OF THE TIME) NOT FIRE WHEN YOU COMPLETE A WORD
       */
      typoStream.subscribe(x => console.log('wont get executed'));
    
      // Whenever the word is completed, request a new word
      wordCompletedStream.subscribe(nextStream);
    
      /**
       * THIS WILL FIRE WHEN YOU COMPLETE A WORD
       */
      typoStream.subscribe(x => console.log('will get executed'));
    
      // Calls a server for a random word
      // returns a promise
      function getRandomWord() {
        return $.ajax({
          // Change the URL to cause a 404 error
          url: 'https://setgetgo.com/randomword/get.php'
        }).promise();
      }
    
    })();
    <script data-require="jquery" data-semver="3.1.1" src="https://ajax.googleapis.com/ajax/libs/jquery/3.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    
    <div>
      <h1>Exercise: Typing of the Dead</h1>
      <div>
        Type the given word correctly and watch the console. Most of the time 1 of the 2 subscriptions on the typoStream will fire (when there should fire none).
      </div>
      <br />
      <div id="TotDWord"></div>
      <input type="text" name="" id="TotDInput" value="" /><span>Highscore: </span><span id="TotDHighscore"></span>
      <div id="TotDScore"></div>
    </div>
    
    <script>
      console.clear();
    </script>