Search code examples
javamultithreadingrx-javarx-java2

RxJava interrupt parallel execution when first result found


I am running some service working with delay in parallel. The task is to not wait while all services have finished execution.

    Observable.just(2, 3, 5)
                .map(delay -> serviceReturningSingleWithDelay(delay))
                .toList()
                .flatMap(list ->
                        Single.zip(list, output -> Arrays.stream(output)
                                .map(delay -> (Integer) delay)
                                .filter(delay -> delay == 3)
                                .findFirst()
                                .orElse(0)
                        ))
                .subscribe(System.out::println);
 private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
        return Single.just(delay)
                .delay(delay, TimeUnit.SECONDS)
                .doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
    }

Now my output is:

Delay 2: Thread : RxComputationThreadPool-1 
Delay 3: Thread : RxComputationThreadPool-2 
Delay 5: Thread : RxComputationThreadPool-3 
3

The desired result is to obtain filtered value - 3 before RxComputationThreadPool-3 thread finished execution. I will be thankful for any ideas.


Solution

  • If you want to run them all in parallel and exit when you receive value 3, you don't need to use zip. Rather use takeWhile to interrupt your observable like the following :

    Observable.just(2, 3, 5)
              .flatMapSingle(this::serviceReturningSingleWithDelay)
              .takeWhile(e -> e != 3)
              .subscribe(System.out::println);
    

    And if you want the 3 value use takeUntil(e -> e == 3) instead of takeWhile(e -> e != 3)