Let's say we have source Observable of Ints:
val source:Observable[Int]
I would like to create another Observable, producing values whose difference to first appeared value at source is greater than 10:
def detect() = Observable[Int](
subscriber =>
if (!subscriber.isUnsubscribed) {
var start:Option[Int] = None
source.subscribe(
item => {
if (start.isEmpty) {
start = Option(item)
}
else {
start.filter(v => Math.abs(item - v) > 10).foreach {
item => subscriber.onNext(item)
}
}
}
)
subscriber.onCompleted()
}
)
Here I've used var start to hold first value of source Observable.
Is there a way to simplify this code? I don't like this approach with assigning value to a var
Here is what I came up with:
import rx.lang.scala.Observable
val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50))
source.scan(Option.empty[(Int, Int)]) { (acc, next) =>
acc.map(_.copy(_2 = next)) orElse Some((next, next))
}.collect {
case Some((start, current)) if math.abs(start - current) > 10 => current
}.subscribe(x => println(x))
prints
16
-40
-70
50
basically scan keeps an accumulator that can be uninitialized (None
), or can hold a pair: the first value and the last element emitted from source. Then we collect only those elements that meet the your predicate.