I'm trying to understand what the difference would be between these two observables. The only difference in code is this:
/**
* Inside rxjs pipe
*/
if(typeof x === 'number' && x > 3) {
return of(x);
} else {
return EMPTY;
}
vs:
.filter(typeof x === 'number' && x > 3)
The test I'm running:
const a$ = from([1, 6, '4']).pipe(
tap(console.log),
flatMap((x) => {
if (typeof x === 'number') {
if (x > 3) {
return of(x);
}
return EMPTY;
}
return EMPTY;
}),
tap(console.log)
);
const sub_a = a$.subscribe(
(x) => { console.log(x, 'success'); done(); },
(e) => { console.log(e, 'error'); done(); },
() => { console.log('complete'); sub_a.unsubscribe(); done(); }
);
and:
const b$ = from([2, 5, '8']).pipe(
tap(console.log),
filter(x => typeof x === 'number' && x > 3),
tap(console.log)
);
const sub_b = b$.subscribe(
(x) => { console.log(x, 'success'); done(); },
(e) => { console.log(e, 'error'); done(); },
() => { console.log('complete'); sub_b.unsubscribe(); done(); }
);
for both of them I get the first value logged once (before the filter/flatMap), the second value logged twice from the taps and once with "complete", and the third one once.
I thought the difference would be that emitting EMPTY
would cause the observable to close entirely but subsequent values are still seen through the pipe.
I've done the same with a Subject
and the only difference was that the Subject
s didn't emit Complete
which was to be expected.
There could be a difference if Observable returned from flatMap
have different scheduler, but in your example visible behavior should be the same. Usually this can happen if you rely on side effects, which is generally discouraged.
Here is example when asyncScheduler
change behavior (values printed after creation in the second example):
const { of, asyncScheduler, EMPTY, from } = rxjs; // = require("rxjs")
const { filter, flatMap } = rxjs.operators; // = require("rxjs/operators")
const items$ = from([1, 2, 3, 4, 5]);
console.log("------------ SYNC");
const sync$ = items$.pipe(
filter(v => v % 2 === 0)
);
sync$.subscribe(e => console.log(e));
console.log("after sync");
console.log("------------ ASYNC");
items$.pipe(
flatMap(v => v % 2 === 0 ? of(v, asyncScheduler) : EMPTY)
).subscribe(e => console.log(e));
console.log("after async");
<script src="https://unpkg.com/rxjs@6.5.2/bundles/rxjs.umd.min.js"></script>