Search code examples
rx-javaobservablerx-scala

How to cancel a mapped Observable with specify condition at RxScala/Java?


original observable ------a-------b-------c----------d-------->....
mapped observable -----A-------B(finish)
Simple code as this:

  val original = Observable.just('a', 'b', 'c', 'd')
  val mapped = original.map(x => x.toUpper)
  //how to let `mapped` Observable stop emit event when received 'b' from original?
  //do something
  mapped.subscribe(x => println(x)) //make it only print A ,B

How to make mapped observable finished at B this specify condition?

UPDATE for takeUntil method
takeUntil seems a standard answer but my editer shows this method take Observable[Any] as paramter.See definition please,
def takeUntil(that: Observable[Any]): Observable[T]

if I use follows code

  val original = Observable.just('a', 'b', 'c', 'd').takeUntil(x => x == 'b')

a compile error occurred

Error:(74, 64) missing parameter type
  val original = Observable.just('a', 'b', 'c', 'd').takeUntil(x => x == 'b')
                                                           ^

Does we use a same package? my sbt dependency is "com.netflix.rxjava" % "rxjava-scala" % "0.20.7"


Solution

  • Use takeWhile, it does exactly what you need.