I am trying to upgrade RxJava from 1 to 2. In my old code I have a method like below:
private Observable<Integer> reversRange(int from, int to) {
Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
return Observable.from(() -> intStream.iterator())
.takeWhile(n -> n > from)
.map(n -> n );
}
But now in RxJava 2 I can not use from
. What would be equivalent of this code?
I have found in this question that it is fromIterable
but I do not know how to use it with Stream.
Or other example, this should not be only for range but for any infinite stream.
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return Observable.from(() -> intStream.iterator());
}
Use the generate()
function:
this is kotlin code (an extension function), but you just need to change the lambda slightly. And this works with any stream.
fun <T> Stream<T>.toFlowable(): Flowable<T> {
return Flowable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
you can also use Observable
if you prefer, but I don't see why you should.
fun <T> Stream<T>.toObservable(): Observable<T> {
return Observable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
I think in java will be something like:
public <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.generate(
() -> stream.iterator(),
(ite, emitter) -> {
if (ite.hasNext()) {
emitter.onNext(ite.next());
} else {
emitter.onComplete();
}
}
);
}
and so your code would become:
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return streamToObservable(intStream);
}