Search code examples
androidobservablerx-javarx-java2

RxJava 2 force complete chain with infinite observable


I have an observable chain that has infinite observable on the top and non infinite observables after it. Like this:

repo.infinitGetItems()
      .switchMap(items -> Observable
                          .just(items)
                          .flatMap(items -> repo.nonInfinitObs(items)));

What I want is to complete all chain when repo.nonInfinitObs sends onComplete event. Now it's not completing because repo.infinitGetItems() is still alive.

Could I do something like force complete for the whole chain in rxJava2?


Solution

  • You can stop the main sequence via takeUntil and some flow-external means:

    PublishSubject<Integer> stop = PublishSubject.create();
    
    repo.infinitGetItems()
      .takeUntil(stop)
      .switchMap(items -> repo.nonInfinitObs(items)
                          .doOnComplete(() -> stop.onComplete())
      );