Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));
Flux<Long> flux2 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 1)
.doOnNext(aLong -> System.out.println("flux 2 : " + aLong));
Flux.merge(flux1, flux2)
.doOnNext(System.out::println)
.then()
.block();
Create two Flux<Long>
like upper code.
flux1 create even number stream (0,2,4,6,8 ...) flux2 create odd number stream (1,3,5,7,9 ...)
i expected when merge this 2 flux1 and flux2 work like
0,1,2,3,4 ...
or 0,2,1,3,4..
depends on computing power
but always spend flux1 and spend flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19
how to subscribe multiple flux eagerly event?
Both streams run on the same thread. When you subscribe flux1
starts pushing data until it's finished. Only then the thread is free for flux2
to continue. The merge
operator emits values in the order they arrive. It doesn't switch between the first and the second stream.
If you want the streams run concurrently you need to run them on different threads, e.g. by using the publishOn
operator.
Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.publishOn(Schedulers.newSingle("thread-x")
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));