Search code examples
javareactive-programmingrx-javareactivex

Observables executed in parallel


I was doing some experiments with reactiveX Zip, and I notice that the observables that I define inside my zip are executed sequentially one after the other. I thought that the good thing of the zip was that every single observable defined inside the zip was executed by one thread, so all of them were executed in parallel. There´s any way to achieve what I want?. Here is my zip example

         @Test
public void testZip() {
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                     .concat(s3))
              .subscribe(System.out::println);
}

public Observable<String> obString() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("hello");
}

public Observable<String> obString1() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just(" world");
}

public Observable<String> obString2() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("!");
}

Solution

  • You are looking at the wrong thing.

    obString* are all executed on the same thread because they are executed when you call them in testZip.

    What you want to be looking at is what happens in the observable, which is not possible using simply just, you'd need a custom observable and look at the current thread in the body of onSubscribe.

    Also, you probably want to use scheduleOn to give either a specifically new thread or a thread pool to your Observable.