Does the sequential()
operator convert the ParallelFlowable
to a Flowable
stream that emits items while the remaining parallel tasks are still running, or only after they are completed?
For example:
Consider that I have 5 long-running time-varying tasks that need to be executed in parallel. I use a ParallelFlowable
that performs those tasks; after which I add a sequential()
operator; and then a take(1)
operator. Will I... :
ParallelFlowable
converts into a Flowable
?I'm hoping it should be the prior of the above, but wanted a confirmation.
If you are unsure how RxJava behaves in certain simple situations, the best thing is to write a small program or unit test with a couple of System.out.println()
or with .test()
and some .assertValues()
/.assertNotComplete()
/etc. at various places to see what is happening where and when.
source
.doOnNext(v -> System.out.println("Source sent: " + v))
// .operator1()
// .operator2()
// etc.
RxJava is a composition library which means you can often isolate the source(s), the processing logic and the end-consumer so that they can be individually tested. For example, you could replace a network call with a simple Observable.just()
emitting a constant response. In other cases, you could create a PublishSubject
and call onNext
in a step-by-step fashion checking the expected state after each of those.
PublishSubject<Integer> publishSource = PublishSubject.create();
Observable<Integer> squares = publishSource
.map(v -> v * v);
For this though, you often need to factor out the processing logic into so-called transformer methods of functions, that take the typed but otherwise unconstrained source (i.e., the PublishSubject
) and return some other type (Observable
) to be consumed by and verified by a test consumer.
public static Observable<Integer> square(Observable<Integer> source) {
return source.map(v -> v * v);
}
Observable<Integer> squared = square(Observable.just(2));
Observable<Integer> anotherSqr = square(publishSource);
Finally, you can then consume the last Observable
source or simply see what you can print to the console:
square(publishSource
.doOnNext(v -> System.out.println("Input: " + v))
)
.subscribe(
v -> System.out.println("Output: " + v),
Throwable::printStackTrace,
() -> System.out.println("Done")
);
publishSource.onNext(1);
// Input: 1
// Output: 2
publishSource.onNext(2);
// Input: 2
// Output: 4
publishSource.onComplete();
// Done
You can insert Thread.sleep()
s in case the transformation involves asynchrony or timed delays so that response to multiple inputs can be separated in your output/assertions.
(As for your wanted confirmation, sequential
doesn't wait for all items to finish, it will emit items from any of the parallel rails as soon as they become available and the downstream is ready to receive them.)