I'm working on an exercise about RxJS. And there's something very strange happening:
typoStream.subscribe(x => console.log('wont get executed'));
wordCompletedStream.subscribe(nextStream);
typoStream.subscribe(x => console.log('will get executed'));
When the application runs the first console.log won't get printed and the second one will.
Regardless of what the streams are and how they interact - this should never happen, right? Why is it important when I subscribe to an observable - shouldn't it emit the event to every subscriber anyways?
If you want to try it: http://embed.plnkr.co/xb8Yimo5RcYGPtgClYgY/
Type the displayed word correctly and you can see the "error" in action. But it doesn't happen every time - only most of the time.
Here's the stream flow: https://photos.app.goo.gl/Z4cpKzekAIuKzMF93
I had a play with the code you posted, and the key fix is to properly multicast the checkWord
observable. You can do this with .publish().refCount()
like you did for wordStream
, or you can use the shortcut method that does the same thing, .share()
.
const checkStream = wordStream.combineLatest(inputStream).share();
The reason this works is that without it, multiple subscriptions to checkStream
or any streams derived from it, such as typoStream
and wordCompletedStream
will each trigger a new subscription to the wordStream
observable (which is correctly multicast, so no new request gets made) and the inputStream
observable, which will register new event listeners on the input.
With the .share()
operator, it doesn't matter how many subscriptions are made to checkStream
or derived observables, only the first one will trigger a subscription to inputStream
.
Note that after this fix, neither of the two subscribers to typoStream
will fire for a correctly entered word. Which is what I would expect from an observable called typoStream
. Both will fire when an incorrect character is entered.
Or see the snippet below:
(() => {
// --- UI Stuff, NO NEED TO TOUCH THESE --- //
const wordField = $('#TotDWord');
const inputField = $('#TotDInput');
// ----------------------------------------- //
// A stream of the users string inputs
const inputFieldStream = Rx.Observable.fromEvent(inputField, 'keyup')
.map(x => x.target.value).distinctUntilChanged();
// This stream is used to represent the users unput - we don't use the
// inputFieldStream directly because we want to manually send values aswell
const inputStream = new Rx.Subject();
// Feed the stream from the field into our inputStream
inputFieldStream.subscribe(inputStream);
// A stream that allows us to manually trigger that we need a new word
const nextStream = new Rx.Subject();
// When we want the next word we need to reset the users input
nextStream.subscribe(() => {
inputField.val('');
inputStream.onNext('');
});
// This stream calls a server for a new random word every time the nextStream emits an event. We startWith a value to trigger the first word
const wordStream = nextStream.startWith('')
.flatMapLatest(getRandomWord)
// publish & refCount cache the result - otherwise every .map on wordStream would cause a new HTTP request
.publish().refCount();
// When there is a new word, we display it
wordStream.subscribe(word => {
wordField.empty();
wordField.append(word);
});
// Checkstream combines the latest word with the latest userinput. It emits an array, like this ['the word', 'the user input'];
const checkStream = wordStream.combineLatest(inputStream).share();
// Emits an event if the user input is not correct
const typoStream = checkStream.filter(tuple => {
const word = tuple[0];
const input = tuple[1];
return !word.startsWith(input);
});
// When there is a typo we need a new word
typoStream.subscribe(nextStream);
// Emits an event when the user has entered the entire word correctly
const wordCompletedStream = checkStream.filter(tuple => {
const word = tuple[0];
const input = tuple[1];
return word == input;
});
/**
* THIS WILL (MOST OF THE TIME) NOT FIRE WHEN YOU COMPLETE A WORD
*/
typoStream.subscribe(x => console.log('wont get executed'));
// Whenever the word is completed, request a new word
wordCompletedStream.subscribe(nextStream);
/**
* THIS WILL FIRE WHEN YOU COMPLETE A WORD
*/
typoStream.subscribe(x => console.log('will get executed'));
// Calls a server for a random word
// returns a promise
function getRandomWord() {
return $.ajax({
// Change the URL to cause a 404 error
url: 'https://setgetgo.com/randomword/get.php'
}).promise();
}
})();
<script data-require="jquery" data-semver="3.1.1" src="https://ajax.googleapis.com/ajax/libs/jquery/3.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<div>
<h1>Exercise: Typing of the Dead</h1>
<div>
Type the given word correctly and watch the console. Most of the time 1 of the 2 subscriptions on the typoStream will fire (when there should fire none).
</div>
<br />
<div id="TotDWord"></div>
<input type="text" name="" id="TotDInput" value="" /><span>Highscore: </span><span id="TotDHighscore"></span>
<div id="TotDScore"></div>
</div>
<script>
console.clear();
</script>