Given an Stream within an Observable, I want to validate/check each item. In case one is broken I want to throw an error via Observable.throw, hence break all further processing.
My clunky solution would be
import * as Rx from 'rxjs'
inputStream.mergeMap(item => (isValid(item))
? Rx.Observable.of(item)
: Rx.Observable.throw(new Error("not valid"))
)
This seems ugly, as it constructs for the positive flow a bunch of unnecessary Observables.
Is there a better way to check items in an Observable?
You can use just normal map
and throw an exception inside it:
inputStream.map(item => {
if (isValid(item)) {
return item;
}
throw new Error("not valid");
})