Search code examples
javajava-streamrx-javarx-java2reactive-streams

How to create Observable<Integer> from infinite Stream<Integer> using RxJava2?


I am trying to upgrade RxJava from 1 to 2. In my old code I have a method like below:

private Observable<Integer> reversRange(int from, int to) {
    Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

But now in RxJava 2 I can not use from. What would be equivalent of this code? I have found in this question that it is fromIterable but I do not know how to use it with Stream.

Or other example, this should not be only for range but for any infinite stream.

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}

Solution

  • Use the generate() function:

    this is kotlin code (an extension function), but you just need to change the lambda slightly. And this works with any stream.

    fun <T> Stream<T>.toFlowable(): Flowable<T> {
      return Flowable.generate(
        Callable { iterator() },
        BiConsumer { ite, emitter ->
          if (ite.hasNext()) {
            emitter.onNext(ite.next())
          } else {
            emitter.onComplete()
          }
        }
      )
    }
    

    you can also use Observable if you prefer, but I don't see why you should.

    fun <T> Stream<T>.toObservable(): Observable<T> {
      return Observable.generate(
        Callable { iterator() },
        BiConsumer { ite, emitter ->
          if (ite.hasNext()) {
            emitter.onNext(ite.next())
          } else {
            emitter.onComplete()
          }
        }
      )
    }
    

    I think in java will be something like:

    public <T> Observable<T> streamToObservable(Stream<T> stream) {
      return Observable.generate(
        () -> stream.iterator(),
        (ite, emitter) -> {
          if (ite.hasNext()) {
            emitter.onNext(ite.next());
          } else {
            emitter.onComplete();
          }
        }
      );
    }
    

    and so your code would become:

    private Observable<Integer> numbers() {
        Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
        return streamToObservable(intStream);
    }