Search code examples
project-reactorreactive-streams

How to eagerly merge two Flux?


Flux<Long> flux1 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 0)
        .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));

Flux<Long> flux2 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 1)
        .doOnNext(aLong -> System.out.println("flux 2 : " + aLong));

Flux.merge(flux1, flux2)
        .doOnNext(System.out::println)
        .then()
        .block();

Create two Flux<Long> like upper code.

flux1 create even number stream (0,2,4,6,8 ...) flux2 create odd number stream (1,3,5,7,9 ...)

i expected when merge this 2 flux1 and flux2 work like

0,1,2,3,4 ... or 0,2,1,3,4.. depends on computing power

but always spend flux1 and spend flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19

how to subscribe multiple flux eagerly event?


Solution

  • Both streams run on the same thread. When you subscribe flux1 starts pushing data until it's finished. Only then the thread is free for flux2 to continue. The merge operator emits values in the order they arrive. It doesn't switch between the first and the second stream.

    If you want the streams run concurrently you need to run them on different threads, e.g. by using the publishOn operator.

    Flux<Long> flux1 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .publishOn(Schedulers.newSingle("thread-x")
        .filter(aLong -> aLong % 2 == 0)
        .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));