I am trying to better understand mergemap by recreating a very simple example and it is not behaving as I expected from my understanding of the official description.
In this example each successive document click outputs a single result (i.e. the output of the "of" observable, which is behaviour I would have expected from switchmap. My understanding of mergemap is that it will merge the results of multiple inner observables which does not appear to be the case here.
To examine the lifecycle of the observable I included the verbose version of the subscribe() function so I could see if complete() is called, but it isn't, so I expected the results of multiple of observerables to accumulate with successive document click and be presented in the output on each click. However what happens is a single new observable result is created each time. Why does each successive document click only emit one observable stream, and discard the previous one. Isnt this the behaviour of switchmap not mergemap?
fromEvent(document, 'click')
.pipe(mergeMap(() => of(Date.now())))
.subscribe({
next(x) {
console.log('value ', x);
},
complete() {
console.log('done');
},
});
In this example each successive document click outputs a single result (i.e. the output of the "of" observable, which is behavior I would have expected from switchmap.)
In this example, switchMap
and mergeMap
have the same behavior.
Here's some code you can play around with to see where the two differ. Try this with both mergeMap
, switchMap
, and hey, why not try concatMap
too?.
// Lets pretend we're grabbing the date from a server.
// The server is slow, so this takes 3 seconds
function getDate(): Observable<number> {
return timer(3000).pipe(
map(_ => Date.now())
);
}
fromEvent(document, 'click').pipe(
take(5),
mergeMap(getDate)
).subscribe({
next: x => console.log('value ', x),
complete: () => console.log('done')
});
You'll notice how mergeMap keeps every incoming 'click' from the document. If you 'click' twice, there will be two calls to getDate
and both calls will be kept active at the same time. 3 seconds later the first call will complete and emit a number and then some moments later the second call will complete and emit a number.
Once you've clicked 5 times and all the resulting calls to getDate
complete, you'll see your stream is done.
Try this with switchMap
and you'll notice that if you click more than once in a 3 second interval, the oldest click is dropped in favor of the newest one. switchMap
will only allow a single inner observable (call to getDate) to be active at a time, so it cancels old ones and switches over to the new ones.
This all becomes more interesting when dealing with observables that emit more than once.
the merge in mergeMap
doesn't mean "merge the items these streams emit." It means "merge these streams".
Part 1 (of 3) By preserve I mean that of() returns a distinct observable that can be subscribed to. e.g. I can do "of("value").subscribe(val => console.log(val))" to give output value. If I repeat subscribtions to that same observable I will get repeated identical outputs. If that same observable is then placed as the inner observable of a mergemap() why isnt it preserved on subsequent invocations?
For each value emitted by the source observable, mergeMap returns a new inner observable exactly once.
These two observables have the same behavior:
// Turn 3 observables into a single observable.
// merged$ will (as a single stream) emit all the values
// that it's inner streams emit)
const merged$ = merge(
functionReturningObservable(1),
functionReturningObservable(2),
functionReturningObservable(3)
)
// Turn the 3 values from the source observables into merged
// inner observables.
// merged$ will (as a single stream) emit all the values
// that it's inner streams emit)
const merged$ = from([1,2,3]).pipe(
mergeMap(v => functionReturningObservable(v))
)
Part 2 (orf 3) Why am I not seeing an accumulation of "of" observables every time I click the mouse in the example above? if mergemap merges streams where did the previous stream go? If I replace the inner observable "of" with an "interval it behaves exactly as I would expect and each click accumulates a new interval observable stream.
If of("value")
emits one value and completes. If merged with another stream, of("value")
still just emits a single value and completes. interval
emits many values and never completes, so you should expect any observable that an interval is merged into to emit many values and never complete.
merge(
interval(50).pipe(take(5)),
of("hello world"),
).subscribe(console.log);
emits:
"hello world"
0
1
2
3
4
Neither merge
nor mergeMap
will unsubscribe
and then resubscribe
to an inner observable. They both subscribe once and will let the inner observable naturally complete.
Part 3(of 3) You said "Because mergeMap merges, it will not drop the date.". This post exists because mergmap appears to be dropping the date of the previous invocation, where I expected an accumulation of streams (as seen if I replace of with interval)
If you're not seeing the date emitted on the merged observable, then there's likely a bug in your code. While you shouldn't see a date re-emitted, you should still see it once the same way you would with the inner observable on it's own (that's what it means to merge).
Here's a new operator that resubscribes to every inner observable every time a new value is emitted by the source. Not sure if that's helpful, but in case you'd like to play around with the differences:
function resubscribingMergeMap<T, R>(
project: (v: T) => Observable<R>
): OperatorFunction<T, R>{
return pipe(
map(project),
scan(
(acc, curr) => [curr, ...acc],
[] as Observable<R>[]
),
mergeMap(vs => merge(...vs))
);
}
// Here it is in use with your earlier example:
fromEvent(document, 'click').pipe(
take(5),
resubscribingMergeMap(_ => of(Date.now()))
).subscribe({
next: x => console.log('value ', x),
complete: () => console.log('done')
});