Search code examples
javareactive-programmingspring-webfluxproject-reactorreactive-streams

Project Reactor what are differences between flux conCat, flux mergeSequential, flux mergeOrdered


All of these methods will produce the same result if we provide same data source. So what are differences between them?


Solution

  • Take the following (contrived) concat() example with two publishers that emit 3 elements at 100ms intervals:

    Flux<Integer> a = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
    Flux<Integer> b = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
    
    Flux.concat(a, b)
            .timed()
            .doOnNext(x -> System.out.println(x.get() + ": " + x.elapsed().toMillis()))
            .blockLast();
    

    Here you'll see something similar to the following as output:

    0: 138
    1: 107
    2: 108
    0: 111
    1: 102
    2: 107
    

    So we have 6 elements emitted, at 100ms intervals. The first publisher is subscribed to, emits 3 elements at 100ms intervals, then completes. The second publisher is then subscribed to, emits its 3 elements at 100ms intervals, then it completes.

    If we instead replace concat() with mergeSequential(), you'll see something like:

    0: 118
    1: 107
    2: 107
    0: 0
    1: 0
    2: 0
    

    The elements are emitted in the same order - but look at the timings on the last 3! This is because the behaviour is slightly different - in this case, both publishers are subscribed to and so begin emitting elements at 100ms intervals. The elements from the first publisher are emitted as they're received, but the elements from the second publisher are cached until the first publisher completes.

    When the first publisher does complete, the second publisher then takes over - all the elements we cached are emitted immediately, hence there's no delay (so the timings are zero.) We've got the same elements emitted, but much faster.

    This seems advantageous, but there's two main reasons you might not want to just jump to relying on mergeSequential() rather than concat():

    • Caching all the elements behind the scenes takes memory. In this example with 3 elements that's barely any memory footprint, but if you start dealing with millions of elements (or even a publisher that may never complete), you can very quickly run out of memory.
    • Subscribing immediately may well change the behaviour. Take two publishers - one that mutates a value in a database, and another that reads it. If you concatenate those, the write will always happen before the read. If you subscribe to both at the same time, that's not the case, you'll probably read it before it's been written (but not necessarily.)

    For the above two reasons, in my experience you usually want to just use concat() rather than mergeSequential() in real-world use.

    As for mergeOrdered(), use that in the above example and you'll see the actual order of the elements differs:

    0: 127
    0: 105
    1: 17
    1: 90
    2: 15
    2: 0
    

    Here the eager subscription part of mergeSequential() is the same, but with one other twist - the values emitted from each publisher are compared as they're emitted, and the smallest one emitted first. Hence you'll see (in this case) an ordered stream of digits: 0,0,1,1,2,2. Note the timings appear different to mergeSequential() as it's interleaving values from both publishers in your final output, not just merging them sequentially.