Search code examples
javaspringreactive-programmingspring-webfluxproject-reactor

Concat Flux with its count (or another reduce function)


Is it possible to proceed(concat) a Flux with some reduce function on it, for example number of passed elements?

That is, for example, to turn a

Flux.fromStream(Stream.of("a", "b", "c")) //note source flux could be read only once 

to Flux that will evaluate in

"a", "b", "c", "3"

Solution

  • For example:

    public static void main(String[] args) {
      Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
    
      Flux<String> fork = source.publish().autoConnect(2);
      ConnectableFlux<Long> counter = fork.count().flux().replay(1);
      counter.connect();
    
      Flux<String> result = fork.concatWith(counter.map(Object::toString));
    
      result.subscribe(System.out::println);
    }
    

    Notes:

    1. .publish() is used to broadcast the same sequence to multiple subscribers. autoConnect(2) prevents an early subscription and consumption of the sequence. Exactly 2 subscribers are expected.
    2. It is known that Flux.concat() subscribes lazily: the second subscription is created once the first sequence completes, but in your case you need two active subscribers for the same source sequence at the same time. This is solved by creating a ConnectableFlux and explicitly calling its connect() method. (There is no ConnectableMono class, thus the counter is converted from Mono to a Flux). The value is obtained early and is cached, and will be replayed once Flux.concat() subscribes to it.

    Regarding the issue with Flux.concat() and its laziness: I just noticed that Flux.mergeSequential() is documented as an alternative to concat that subscribes eagerly.

    Thus my code can be simplified by using mergeSequential() instead of concat():

    public static void main(String[] args) {
      Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
    
      Flux<String> fork = source.publish().autoConnect(2);
      Mono<Long> counter = fork.count();
    
      Flux<String> result = Flux.mergeSequential(fork, counter.map(Object::toString));
    
      result.subscribe(System.out::println);
    }