Search code examples
observablereactive-programmingrx-java3

RxJava subscribeOn conditionally


I would like to apply subscribeOn based on the condition on Observables.

For example in the below code

Observable.just("One", "Two", "Three")
    .flatMap(v ->
            performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread()) //I want to apply this if a condition is true. Otherwise run on the main thread
    )
    .subscribe(item -> System.out.println(item));

Solution

  • You can introduce a local variable for the common part, then do the if, and return the common part or the augmented part based on it.

    .flatMap(v -> {
    
        var o = performLongOperation(v)
                .doOnNext(s -> System.out.println("processing item on thread " 
                     + Thread.currentThread().getName()));
    
        if (condition) {
            return o.subscribeOn(Schedulers.newThread());
        }
        return o;
    })
    

    If you want to be more fancy, use compose() and ternary:

    .flatMap(v ->
                performLongOperation(v)
                .doOnNext(s -> System.out.println("processing item on thread " 
                      + Thread.currentThread().getName()))
                .compose(o -> condition ? o.subscribeOn(Schedulers.newThread()) : o)
        )