Is it possible to proceed(concat) a Flux with some reduce function on it, for example number of passed elements?
That is, for example, to turn a
Flux.fromStream(Stream.of("a", "b", "c")) //note source flux could be read only once
to Flux that will evaluate in
"a", "b", "c", "3"
For example:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
ConnectableFlux<Long> counter = fork.count().flux().replay(1);
counter.connect();
Flux<String> result = fork.concatWith(counter.map(Object::toString));
result.subscribe(System.out::println);
}
Notes:
.publish()
is used to broadcast the same sequence to multiple subscribers. autoConnect(2)
prevents an early subscription and consumption of the sequence. Exactly 2 subscribers are expected.Flux.concat()
subscribes lazily: the second subscription is created once the first sequence completes, but in your case you need two active subscribers for the same source sequence at the same time. This is solved by creating a ConnectableFlux
and explicitly calling its connect()
method. (There is no ConnectableMono
class, thus the counter is converted from Mono
to a Flux
). The value is obtained early and is cached, and will be replayed once Flux.concat()
subscribes to it.Regarding the issue with Flux.concat()
and its laziness: I just noticed that Flux.mergeSequential()
is documented as an alternative to concat
that subscribes eagerly.
Thus my code can be simplified by using mergeSequential()
instead of concat()
:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
Mono<Long> counter = fork.count();
Flux<String> result = Flux.mergeSequential(fork, counter.map(Object::toString));
result.subscribe(System.out::println);
}