I am playing around with Rxjs, observables and maps, and I found a strange behavior for Observable.throw(error)
that I cannot explain.
If I have a Rx stream that uses a map operator, and I want to interrupt the process, I would have expected the method Observable.throw
to be appropriate, however, this does not seem to be the case.
Consider the following example:
Rx.Observable.just("test 1").subscribe(
x => console.log(x),
err => console.log(err),
() => console.log("done, no errors")
);
Now let's introduce an error, if I use the regular throw
from Javascript it works as expected:
Rx.Observable.just("test 2").map(
x => { throw new Error(x) }
).subscribe(
x => console.log(x),
err => console.log(err),
() => console.log("done - error was thrown, as expected")
);
Output:
Error: test 2 at Rx.Observable.just.map.x ((index):58) at c (rx.all.compat.js:61) at e.onNext (rx.all.compat.js:5169) (...)
But if I use Rx.Observable.throw(...), the error
callback from the following subscriber is never called, and the next
callback will be called instead with some strange object that seems to be an Rx error object.
Rx.Observable.just("test 3").map(
x => Rx.Observable.throw(x)
).subscribe(
x => console.log(x),
err => console.log(err),
() => console.log("done - it does not work... why?")
);
Output:
b_subscribe: f(a)error: "test 3" scheduler: a__proto__: g
As @Whymarrh pointed out, it seems to work fine if I use a flatMap operator instead.
The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.
Documentation for Observable.throw:
Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single onError message.
Does anyone know why when using Observable.throw inside the map operator the error
callback is not called, and why the process is not interrupted?
I know that I can just use the regular throw and move on, I already have a working solution, I am posting this question out of curiosity to have a better understanding of how the framework works.
Quick reminder: stackoverflow has a be nice policy.
One of the comments properly answers this question:
throw new Error(x)
throws an exception that simply bubbles to the top of the process.
x => Rx.Observable.throw(x)
just creates a structure representing an error. To the map
operator, this structure is a value like any other, to call the success handler. flatMap
on the other hand will take the structure and unwrapped it, then call the error handler.