I would like to apply subscribeOn based on the condition on Observables.
For example in the below code
Observable.just("One", "Two", "Three")
.flatMap(v ->
performLongOperation(v)
.doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()) //I want to apply this if a condition is true. Otherwise run on the main thread
)
.subscribe(item -> System.out.println(item));
You can introduce a local variable for the common part, then do the if, and return the common part or the augmented part based on it.
.flatMap(v -> {
var o = performLongOperation(v)
.doOnNext(s -> System.out.println("processing item on thread "
+ Thread.currentThread().getName()));
if (condition) {
return o.subscribeOn(Schedulers.newThread());
}
return o;
})
If you want to be more fancy, use compose()
and ternary:
.flatMap(v ->
performLongOperation(v)
.doOnNext(s -> System.out.println("processing item on thread "
+ Thread.currentThread().getName()))
.compose(o -> condition ? o.subscribeOn(Schedulers.newThread()) : o)
)