Search code examples
parallel-processingrx-javaflowable

How does sequential() in ParallelFlowable work?


Does the sequential() operator convert the ParallelFlowable to a Flowable stream that emits items while the remaining parallel tasks are still running, or only after they are completed?

For example:
Consider that I have 5 long-running time-varying tasks that need to be executed in parallel. I use a ParallelFlowable that performs those tasks; after which I add a sequential() operator; and then a take(1) operator. Will I... :

  • Get the first item while the remaining parallel tasks are executing?, or
  • Need to wait till all the tasks complete, after which the ParallelFlowable converts into a Flowable?

I'm hoping it should be the prior of the above, but wanted a confirmation.


Solution

  • If you are unsure how RxJava behaves in certain simple situations, the best thing is to write a small program or unit test with a couple of System.out.println() or with .test() and some .assertValues()/.assertNotComplete()/etc. at various places to see what is happening where and when.

    source
    .doOnNext(v -> System.out.println("Source sent: " + v))
    // .operator1()
    // .operator2()
    // etc.
    

    RxJava is a composition library which means you can often isolate the source(s), the processing logic and the end-consumer so that they can be individually tested. For example, you could replace a network call with a simple Observable.just() emitting a constant response. In other cases, you could create a PublishSubject and call onNext in a step-by-step fashion checking the expected state after each of those.

     PublishSubject<Integer> publishSource = PublishSubject.create();
    
     Observable<Integer> squares = publishSource
     .map(v -> v * v);
    

    For this though, you often need to factor out the processing logic into so-called transformer methods of functions, that take the typed but otherwise unconstrained source (i.e., the PublishSubject) and return some other type (Observable) to be consumed by and verified by a test consumer.

     public static Observable<Integer> square(Observable<Integer> source) {
         return source.map(v -> v * v);
     }
    
     Observable<Integer> squared = square(Observable.just(2));
    
     Observable<Integer> anotherSqr = square(publishSource);
    

    Finally, you can then consume the last Observable source or simply see what you can print to the console:

     square(publishSource
         .doOnNext(v -> System.out.println("Input: " + v))
     )
     .subscribe(
         v -> System.out.println("Output: " + v),
         Throwable::printStackTrace, 
         () -> System.out.println("Done")
     );
    
     publishSource.onNext(1);
    
     // Input: 1
     // Output: 2
    
     publishSource.onNext(2);
    
     // Input: 2
     // Output: 4
    
     publishSource.onComplete();
    
     // Done
    

    You can insert Thread.sleep()s in case the transformation involves asynchrony or timed delays so that response to multiple inputs can be separated in your output/assertions.

    (As for your wanted confirmation, sequential doesn't wait for all items to finish, it will emit items from any of the parallel rails as soon as they become available and the downstream is ready to receive them.)