Search code examples
rxjsreactivex

Rx.Observable validate items in stream (pass or throw)


Given an Stream within an Observable, I want to validate/check each item. In case one is broken I want to throw an error via Observable.throw, hence break all further processing.

My clunky solution would be

import * as Rx from 'rxjs'

inputStream.mergeMap(item => (isValid(item))
    ? Rx.Observable.of(item)
    : Rx.Observable.throw(new Error("not valid"))
)

This seems ugly, as it constructs for the positive flow a bunch of unnecessary Observables.

Is there a better way to check items in an Observable?


Solution

  • You can use just normal map and throw an exception inside it:

    inputStream.map(item => {
      if (isValid(item)) {
        return item;
      }
      throw new Error("not valid");
    })